metaproxy 1.22.1
filter_frontend_net.cpp
Go to the documentation of this file.
1/* This file is part of Metaproxy.
2 Copyright (C) Index Data
3
4Metaproxy is free software; you can redistribute it and/or modify it under
5the terms of the GNU General Public License as published by the Free
6Software Foundation; either version 2, or (at your option) any later
7version.
8
9Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10WARRANTY; without even the implied warranty of MERCHANTABILITY or
11FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program; if not, write to the Free Software
16Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#include "config.hpp"
20
21#if HAVE_GETRLIMIT
22#include <sys/resource.h>
23#endif
24#include <sstream>
25#include <iomanip>
26#include <metaproxy/util.hpp>
27#include "pipe.hpp"
28#include <metaproxy/filter.hpp>
29#include <metaproxy/package.hpp>
32#include <yazpp/z-assoc.h>
33#include <yazpp/pdu-assoc.h>
34#include <yazpp/socket-manager.h>
35#include <yazpp/limit-connect.h>
36#include <yaz/timing.h>
37#include <yaz/log.h>
38#include <yaz/daemon.h>
39#include <yaz/malloc_info.h>
40#include "gduutil.hpp"
41#include <signal.h>
42#include <stdlib.h>
43#include <iostream>
44
45namespace mp = metaproxy_1;
46namespace yf = metaproxy_1::filter;
47
48namespace metaproxy_1 {
49 namespace filter {
51 friend class FrontendNet;
52 PeerStat();
53 ~PeerStat();
54 size_t get(const std::string &peer);
55 size_t add(const std::string &peer);
56 size_t remove(const std::string &peer);
57 class Item {
58 friend class PeerStat;
59 std::string m_peer;
60 size_t cnt;
62 };
64 };
66 friend class Rep;
67 friend class FrontendNet;
68 std::string port;
69 std::string route;
70 std::string cert_fname;
72 };
74 friend class Rep;
75 friend class FrontendNet;
76 std::string pattern;
78 int value;
79 };
81 friend class FrontendNet;
82
86 std::vector<Port> m_ports;
88 std::list<IP_Pattern> session_timeout;
89 std::list<IP_Pattern> connect_max;
90 std::list<IP_Pattern> connect_total;
91 std::list<IP_Pattern> http_req_max;
92 std::string m_msg_config;
93 std::string m_stat_req;
94 yazpp_1::SocketManager mySocketManager;
96 yazpp_1::PDU_Assoc **pdu;
98 double m_duration_lim[22];
104 public:
105 Rep();
106 ~Rep();
107 };
108 class FrontendNet::My_Timer_Thread : public yazpp_1::ISocketObserver {
109 private:
110 yazpp_1::ISocketObservable *m_obs;
113 public:
114 My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
115 void socketNotify(int event);
116 bool timeout();
117 };
118 class FrontendNet::ZAssocChild : public yazpp_1::Z_Assoc {
119 public:
120 ~ZAssocChild();
121 ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
123 const mp::Package *package,
124 Port *port,
125 Rep *rep,
126 yazpp_1::LimitConnect &limit,
127 const char *peername);
130 private:
131 yazpp_1::IPDU_Observer* sessionNotify(
132 yazpp_1::IPDU_Observable *the_PDU_Observable,
133 int fd);
134 void recv_GDU(Z_GDU *apdu, int len);
135 void report(Z_HTTP_Request *hreq);
136 void failNotify();
137 void timeoutNotify();
138 void connectNotify();
139 private:
141 mp::Session m_session;
142 mp::Origin m_origin;
144 const mp::Package *m_package;
146 yazpp_1::LimitConnect &m_limit_http_req;
147 std::string m_peer;
148 };
150 public:
151 ThreadPoolPackage(mp::Package *package,
152 yf::FrontendNet::ZAssocChild *ses,
153 Rep *rep);
156 void result(const char *t_info);
157 bool cleanup(void *info);
158 private:
159 yaz_timing_t timer;
161 mp::Package *m_package;
163 };
164 class FrontendNet::ZAssocServer : public yazpp_1::Z_Assoc {
165 public:
167 ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
168 FrontendNet::Port *port,
169 Rep *rep);
170 void set_package(const mp::Package *package);
172 private:
173 yazpp_1::IPDU_Observer* sessionNotify(
174 yazpp_1::IPDU_Observable *the_PDU_Observable,
175 int fd);
176 void recv_GDU(Z_GDU *apdu, int len);
177
178 void failNotify();
179 void timeoutNotify();
180 void connectNotify();
181 private:
183 const mp::Package *m_package;
184 yazpp_1::LimitConnect limit_connect;
185 yazpp_1::LimitConnect limit_http_req;
188 };
189 }
190}
191
192yf::FrontendNet::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
193 ZAssocChild *ses,
194 Rep *rep) :
195 m_assoc_child(ses), m_package(package), m_p(rep)
196{
197 timer = yaz_timing_create();
198}
199
200yf::FrontendNet::ThreadPoolPackage::~ThreadPoolPackage()
201{
202 yaz_timing_destroy(&timer); // timer may be NULL
203 delete m_package;
204}
205
206bool yf::FrontendNet::ThreadPoolPackage::cleanup(void *info)
207{
208 mp::Session *ses = (mp::Session *) info;
209 if (*ses == m_package->session())
210 {
211 m_assoc_child->m_no_requests--;
212 return true;
213 }
214 return false;
215}
216
217void yf::FrontendNet::ThreadPoolPackage::result(const char *t_info)
218{
219 m_assoc_child->m_no_requests--;
220
221 yazpp_1::GDU *gdu = &m_package->response();
222
223 if (gdu->get())
224 {
225 int len;
226 m_assoc_child->send_GDU(gdu->get(), &len);
227
228 yaz_timing_stop(timer);
229 double duration = yaz_timing_get_real(timer);
230
231 size_t ent = 0;
232 while (m_p->m_duration_lim[ent] != 0.0 && duration > m_p->m_duration_lim[ent])
233 ent++;
234 m_p->m_duration_freq[ent]++;
235
236 m_p->m_duration_total += duration;
237
238 if (m_p->m_duration_max < duration)
239 m_p->m_duration_max = duration;
240
241 if (m_p->m_duration_min == 0.0 || m_p->m_duration_min > duration)
242 m_p->m_duration_min = duration;
243
244 if (m_p->m_msg_config.length())
245 {
246 Z_GDU *z_gdu = gdu->get();
247
248 std::ostringstream os;
249 os << m_p->m_msg_config << " "
250 << *m_package << " "
251 << std::fixed << std::setprecision (6) << duration << " ";
252
253 if (z_gdu)
254 os << *z_gdu;
255 else
256 os << "-";
257
258 yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
259 }
260 }
261 else if (!m_package->session().is_closed())
262 {
263 // no response package and yet the session is still open..
264 // means that request is unhandled..
265 yazpp_1::GDU *gdu_req = &m_package->request();
266 Z_GDU *z_gdu = gdu_req->get();
267 if (z_gdu && z_gdu->which == Z_GDU_Z3950)
268 {
269 // For Z39.50, response with a Close and shutdown
270 mp::odr odr;
271 int len;
272 Z_APDU *apdu_response = odr.create_close(
273 z_gdu->u.z3950, Z_Close_systemProblem,
274 "unhandled Z39.50 request");
275
276 m_assoc_child->send_Z_PDU(apdu_response, &len);
277 }
278 else if (z_gdu && z_gdu->which == Z_GDU_HTTP_Request)
279 {
280 // For HTTP, respond with Server Error
281 int len;
282 mp::odr odr;
283 Z_GDU *zgdu_res
284 = odr.create_HTTP_Response(m_package->session(),
285 z_gdu->u.HTTP_Request, 500);
286 m_assoc_child->send_GDU(zgdu_res, &len);
287 }
288 m_package->session().close();
289 }
290
291 if (m_assoc_child->m_no_requests == 0 && m_package->session().is_closed())
292 {
293 m_assoc_child->close();
294 }
295
296
297 delete this;
298}
299
300mp::IThreadPoolMsg *yf::FrontendNet::ThreadPoolPackage::handle()
301{
302 m_package->move(m_assoc_child->m_port->route);
303 return this;
304}
305
306yf::FrontendNet::ZAssocChild::ZAssocChild(
307 yazpp_1::IPDU_Observable *PDU_Observable,
308 mp::ThreadPoolSocketObserver *my_thread_pool,
309 const mp::Package *package,
310 Port *port, Rep *rep,
311 yazpp_1::LimitConnect &limit_http_req,
312 const char *peername)
313 : Z_Assoc(PDU_Observable), m_p(rep), m_limit_http_req(limit_http_req)
314 , m_peer(peername)
315{
316 m_thread_pool_observer = my_thread_pool;
317 m_no_requests = 0;
318 m_delete_flag = false;
319 m_package = package;
320 m_port = port;
321 std::string addr;
322 addr.append(peername);
323 addr.append(" ");
324 addr.append(port->port);
326 m_origin.set_tcpip_address(addr, m_session.id());
327 int session_timeout = 300; // 5 minutes
328 if (peername) {
329 std::list<IP_Pattern>::const_iterator it = m_p->session_timeout.begin();
330 for (; it != m_p->session_timeout.end(); it++)
331 if (mp::util::match_ip(it->pattern, peername))
332 {
333 if (it->verbose > 1)
334 yaz_log(YLOG_LOG, "timeout pattern=%s ip=%s value=%d",
335 it->pattern.c_str(),
336 peername, it->value);
337 session_timeout = it->value;
338 break;
339 }
340 }
341 timeout(session_timeout);
342}
343
344yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocChild::sessionNotify(
345 yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
346{
347 return 0;
348}
349
350yf::FrontendNet::ZAssocChild::~ZAssocChild()
351{
352 m_p->m_peerStat.remove(m_peer);
353}
354
355void yf::FrontendNet::ZAssocChild::report(Z_HTTP_Request *hreq)
356{
357 mp::odr o;
358
359 Z_GDU *gdu_res = o.create_HTTP_Response(m_session, hreq, 200);
360
361 Z_HTTP_Response *hres = gdu_res->u.HTTP_Response;
362
363 mp::wrbuf w;
364 size_t i;
365 int number_total = 0;
366
367 for (i = 0; m_p->m_duration_lim[i] != 0.0; i++)
368 number_total += m_p->m_duration_freq[i];
369 number_total += m_p->m_duration_freq[i];
370
371 wrbuf_puts(w, "<?xml version=\"1.0\"?>\n");
372 wrbuf_puts(w, "<frontend_net>\n");
373 wrbuf_printf(w, " <responses frequency=\"%d\">\n", number_total);
374 for (i = 0; m_p->m_duration_lim[i] != 0.0; i++)
375 {
376 if (m_p->m_duration_freq[i] > 0)
377 wrbuf_printf(
378 w, " <response duration_start=\"%f\" "
379 "duration_end=\"%f\" frequency=\"%d\"/>\n",
380 i > 0 ? m_p->m_duration_lim[i - 1] : 0.0,
381 m_p->m_duration_lim[i], m_p->m_duration_freq[i]);
382 }
383
384 if (m_p->m_duration_freq[i] > 0)
385 wrbuf_printf(
386 w, " <response duration_start=\"%f\" frequency=\"%d\"/>\n",
387 m_p->m_duration_lim[i - 1], m_p->m_duration_freq[i]);
388
389 if (m_p->m_duration_max != 0.0)
390 wrbuf_printf(
391 w, " <response duration_max=\"%f\"/>\n",
392 m_p->m_duration_max);
393 if (m_p->m_duration_min != 0.0)
394 wrbuf_printf(
395 w, " <response duration_min=\"%f\"/>\n",
396 m_p->m_duration_min);
397 if (m_p->m_duration_total != 0.0)
398 wrbuf_printf(
399 w, " <response duration_average=\"%f\"/>\n",
400 m_p->m_duration_total / number_total);
401
402 wrbuf_puts(w, " </responses>\n");
403
404 int thread_busy;
405 int thread_total;
406 m_thread_pool_observer->get_thread_info(thread_busy, thread_total);
407
408 wrbuf_printf(w, " <thread_info busy=\"%d\" total=\"%d\"/>\n",
409 thread_busy, thread_total);
410
411 wrbuf_malloc_info(w);
412
413 {
414 char buf[200];
415 if (nmem_get_status(buf, sizeof(buf) - 1) == 0)
416 wrbuf_puts(w, buf);
417 }
418 wrbuf_puts(w, "</frontend_net>\n");
419
420 hres->content_len = w.len();
421 hres->content_buf = (char *) w.buf();
422
423 int len;
424 send_GDU(gdu_res, &len);
425}
426
427void yf::FrontendNet::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
428{
429 m_no_requests++;
430
431 mp::Package *p = new mp::Package(m_session, m_origin);
432
433 if (z_pdu && z_pdu->which == Z_GDU_HTTP_Request)
434 {
435 Z_HTTP_Request *hreq = z_pdu->u.HTTP_Request;
436
437 const char *f = z_HTTP_header_lookup(hreq->headers, "X-Forwarded-For");
438 if (f)
439 p->origin().set_tcpip_address(std::string(f), m_session.id());
440
441 if (m_p->m_stat_req.length()
442 && !strcmp(hreq->path, m_p->m_stat_req.c_str()))
443 {
444 report(hreq);
445 delete p;
446 delete this;
447 return;
448 }
449 }
450
451 p->copy_route(*m_package);
452 p->request() = yazpp_1::GDU(z_pdu);
453
454 if (m_p->m_msg_config.length())
455 {
456 if (z_pdu)
457 {
458 std::ostringstream os;
459 os << m_p->m_msg_config << " "
460 << *p << " "
461 << "0.000000" << " "
462 << *z_pdu;
463 yaz_log(YLOG_LOG, "%s", os.str().c_str());
464 }
465 }
466 if (z_pdu && z_pdu->which == Z_GDU_HTTP_Request)
467 {
468 Z_HTTP_Request *hreq = z_pdu->u.HTTP_Request;
469 std::string peername = p->origin().get_address();
470
471 m_limit_http_req.cleanup(false);
472 int con_sz = m_limit_http_req.get_total(peername.c_str());
473 std::list<IP_Pattern>::const_iterator it = m_p->http_req_max.begin();
474 for (; it != m_p->http_req_max.end(); it++)
475 {
476 if (mp::util::match_ip(it->pattern, peername))
477 {
478 if (it->verbose > 1 ||
479 (it->value && con_sz >= it->value && it->verbose > 0))
480 yaz_log(YLOG_LOG, "http-req-max pattern=%s ip=%s con_sz=%d value=%d", it->pattern.c_str(), peername.c_str(), con_sz, it->value);
481 if (con_sz < it->value)
482 break;
483 mp::odr o;
484 Z_GDU *gdu_res = o.create_HTTP_Response(m_session, hreq, 500);
485 int len;
486 send_GDU(gdu_res, &len);
487 delete p;
488 delete this;
489 return;
490 }
491 }
492 m_limit_http_req.add_connect(peername.c_str());
493 }
494 ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
495 m_thread_pool_observer->put(tp);
496}
497
498void yf::FrontendNet::ZAssocChild::failNotify()
499{
500 // TODO: send Package to signal "close"
501 if (m_session.is_closed())
502 {
503 if (m_no_requests == 0)
504 delete this;
505 return;
506 }
507 m_no_requests++;
508
509 m_session.close();
510
511 mp::Package *p = new mp::Package(m_session, m_origin);
512
513 ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
514 p->copy_route(*m_package);
515 m_thread_pool_observer->cleanup(tp, &m_session);
516 m_thread_pool_observer->put(tp);
517}
518
519void yf::FrontendNet::ZAssocChild::timeoutNotify()
520{
521 failNotify();
522}
523
524void yf::FrontendNet::ZAssocChild::connectNotify()
525{
526
527}
528
529yf::FrontendNet::ZAssocServer::ZAssocServer(
530 yazpp_1::IPDU_Observable *PDU_Observable,
531 Port *port,
532 Rep *rep)
533 :
534 Z_Assoc(PDU_Observable), m_port(port), m_p(rep)
535{
536 m_package = 0;
537}
538
539
540void yf::FrontendNet::ZAssocServer::set_package(const mp::Package *package)
541{
542 m_package = package;
543}
544
545void yf::FrontendNet::ZAssocServer::set_thread_pool(
547{
548 m_thread_pool_observer = observer;
549}
550
551yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocServer::sessionNotify(
552 yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
553{
554
555 const char *peername = the_PDU_Observable->getpeername();
556 if (!peername)
557 peername = "unknown";
558 else
559 {
560 const char *cp = strchr(peername, ':');
561 if (cp)
562 peername = cp + 1;
563 }
564 if (peername)
565 {
566 int total_sz = m_p->m_peerStat.get(peername);
567 std::list<IP_Pattern>::const_iterator it = m_p->connect_total.begin();
568 for (; it != m_p->connect_total.end(); it++)
569 {
570 if (mp::util::match_ip(it->pattern, peername))
571 {
572 if (it->verbose > 1 ||
573 (it->value && total_sz >= it->value && it->verbose > 0))
574 yaz_log(YLOG_LOG, "connect-total pattern=%s ip=%s con_sz=%d value=%d", it->pattern.c_str(), peername, total_sz, it->value);
575 if (total_sz < it->value)
576 break;
577 return 0;
578 }
579 }
580 limit_connect.cleanup(false);
581 int con_sz = limit_connect.get_total(peername);
582 it = m_p->connect_max.begin();
583 for (; it != m_p->connect_max.end(); it++)
584 {
585 if (mp::util::match_ip(it->pattern, peername))
586 {
587 if (it->verbose > 1 ||
588 (it->value && con_sz >= it->value && it->verbose > 0))
589 yaz_log(YLOG_LOG, "connect-max pattern=%s ip=%s con_sz=%d value=%d", it->pattern.c_str(), peername, con_sz, it->value);
590 if (con_sz < it->value)
591 break;
592 return 0;
593 }
594 }
595 limit_connect.add_connect(peername);
596 }
597 ZAssocChild *my = new ZAssocChild(the_PDU_Observable,
598 m_thread_pool_observer,
599 m_package, m_port, m_p, limit_http_req,
600 peername);
601 return my;
602}
603
604yf::FrontendNet::ZAssocServer::~ZAssocServer()
605{
606}
607
608void yf::FrontendNet::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
609{
610}
611
612void yf::FrontendNet::ZAssocServer::failNotify()
613{
614}
615
616void yf::FrontendNet::ZAssocServer::timeoutNotify()
617{
618}
619
620void yf::FrontendNet::ZAssocServer::connectNotify()
621{
622}
623
624yf::FrontendNet::FrontendNet() : m_p(new Rep)
625{
626}
627
628yf::FrontendNet::Rep::Rep()
629{
630 m_max_threads = m_no_threads = 5;
631 m_stack_size = 0;
632 m_listen_duration = 0;
633 az = 0;
634 size_t i;
635 for (i = 0; i < 22; i++)
636 m_duration_freq[i] = 0;
637 m_duration_lim[0] = 0.000001;
638 m_duration_lim[1] = 0.00001;
639 m_duration_lim[2] = 0.0001;
640 m_duration_lim[3] = 0.001;
641 m_duration_lim[4] = 0.01;
642 m_duration_lim[5] = 0.1;
643 m_duration_lim[6] = 0.2;
644 m_duration_lim[7] = 0.3;
645 m_duration_lim[8] = 0.5;
646 m_duration_lim[9] = 1.0;
647 m_duration_lim[10] = 1.5;
648 m_duration_lim[11] = 2.0;
649 m_duration_lim[12] = 3.0;
650 m_duration_lim[13] = 4.0;
651 m_duration_lim[14] = 5.0;
652 m_duration_lim[15] = 6.0;
653 m_duration_lim[16] = 8.0;
654 m_duration_lim[17] = 10.0;
655 m_duration_lim[18] = 15.0;
656 m_duration_lim[19] = 20.0;
657 m_duration_lim[20] = 30.0;
658 m_duration_lim[21] = 0.0;
659 m_duration_max = 0.0;
660 m_duration_min = 0.0;
661 m_duration_total = 0.0;
662 m_stop_signo = 0;
663}
664
665yf::FrontendNet::Rep::~Rep()
666{
667 if (az)
668 {
669 size_t i;
670 for (i = 0; i < m_ports.size(); i++)
671 delete az[i];
672 delete [] az;
673 delete [] pdu;
674 }
675 az = 0;
676}
677
678yf::FrontendNet::~FrontendNet()
679{
680}
681
682void yf::FrontendNet::stop(int signo) const
683{
684 m_p->m_stop_signo = signo;
685}
686
687void yf::FrontendNet::start() const
688{
689#if HAVE_GETRLIMIT
690 struct rlimit limit_data;
691 getrlimit(RLIMIT_NOFILE, &limit_data);
692 yaz_log(YLOG_LOG, "getrlimit NOFILE cur=%ld max=%ld",
693 (long) limit_data.rlim_cur, (long) limit_data.rlim_max);
694#endif
695}
696
697bool yf::FrontendNet::My_Timer_Thread::timeout()
698{
699 return m_timeout;
700}
701
702yf::FrontendNet::My_Timer_Thread::My_Timer_Thread(
703 yazpp_1::ISocketObservable *obs,
704 int duration) :
705 m_obs(obs), m_pipe(9123), m_timeout(false)
706{
707 obs->addObserver(m_pipe.read_fd(), this);
708 obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
709 obs->timeoutObserver(this, duration);
710}
711
712void yf::FrontendNet::My_Timer_Thread::socketNotify(int event)
713{
714 m_timeout = true;
715 m_obs->deleteObserver(this);
716}
717
718void yf::FrontendNet::process(mp::Package &package) const
719{
720 if (m_p->az == 0)
721 return;
722 size_t i;
723 My_Timer_Thread *tt = 0;
724
725 if (m_p->m_listen_duration)
726 tt = new My_Timer_Thread(&m_p->mySocketManager,
727 m_p->m_listen_duration);
728
730 new ThreadPoolSocketObserver(&m_p->mySocketManager, m_p->m_no_threads,
731 m_p->m_max_threads,
732 m_p->m_stack_size);
733
734 for (i = 0; i<m_p->m_ports.size(); i++)
735 {
736 yaz_log(YLOG_LOG, "listening on %s", m_p->m_ports[i].port.c_str());
737 m_p->az[i]->set_package(&package);
738 m_p->az[i]->set_thread_pool(tp);
739 }
740 while (m_p->mySocketManager.processEvent() > 0)
741 {
742 if (m_p->m_stop_signo == SIGTERM)
743 {
744 yaz_log(YLOG_LOG, "metaproxy received SIGTERM");
745 if (m_p->az)
746 {
747 size_t i;
748 for (i = 0; i < m_p->m_ports.size(); i++)
749 {
750 m_p->pdu[i]->shutdown();
751 m_p->az[i]->server("");
752 }
753 yaz_daemon_stop();
754 }
755 return; // do not even attempt to destroy tp or tt
756 }
757#ifndef WIN32
758 if (m_p->m_stop_signo == SIGUSR1)
759 { /* just stop listeners and cont till all sessions are done*/
760 yaz_log(YLOG_LOG, "metaproxy received SIGUSR1");
761 m_p->m_stop_signo = 0;
762 if (m_p->az)
763 {
764 size_t i;
765 for (i = 0; i < m_p->m_ports.size(); i++)
766 m_p->az[i]->server("");
767 yaz_daemon_stop();
768 }
769 }
770#endif
771 int no = m_p->mySocketManager.getNumberOfObservers();
772 if (no <= 1)
773 break;
774 if (tt && tt->timeout())
775 break;
776 }
777 delete tp;
778 delete tt;
779}
780
781void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
782 const char *path)
783{
784 if (!ptr || !ptr->children)
785 {
786 throw yf::FilterException("No ports for Frontend");
787 }
788 std::vector<Port> ports;
789 for (ptr = ptr->children; ptr; ptr = ptr->next)
790 {
791 if (ptr->type != XML_ELEMENT_NODE)
792 continue;
793 if (!strcmp((const char *) ptr->name, "port"))
794 {
795 Port port;
796
797 const char *names[5] = {"route", "max_recv_bytes", "port",
798 "cert_fname", 0};
799 std::string values[4];
800
801 mp::xml::parse_attr(ptr, names, values);
802 port.route = values[0];
803 if (values[1].length() > 0)
804 port.max_recv_bytes = atoi(values[1].c_str());
805 else
806 port.max_recv_bytes = 0;
807 if (values[2].length() > 0)
808 port.port = values[2];
809 else
810 port.port = mp::xml::get_text(ptr);
811 port.cert_fname = values[3];
812 ports.push_back(port);
813 }
814 else if (!strcmp((const char *) ptr->name, "threads"))
815 {
816 std::string threads_str = mp::xml::get_text(ptr);
817 int threads = atoi(threads_str.c_str());
818 if (threads < 1)
819 throw yf::FilterException("Bad value for threads: "
820 + threads_str);
821 m_p->m_no_threads = threads;
822 }
823 else if (!strcmp((const char *) ptr->name, "max-threads"))
824 {
825 std::string threads_str = mp::xml::get_text(ptr);
826 int threads = atoi(threads_str.c_str());
827 if (threads < 1)
828 throw yf::FilterException("Bad value for max-threads: "
829 + threads_str);
830 m_p->m_max_threads = threads;
831 }
832 else if (!strcmp((const char *) ptr->name, "stack-size"))
833 {
834 std::string sz_str = mp::xml::get_text(ptr);
835 int sz = atoi(sz_str.c_str());
836 if (sz < 0)
837 throw yf::FilterException("Bad value for stack-size: "
838 + sz_str);
839 m_p->m_stack_size = sz * 1024;
840 }
841 else if (!strcmp((const char *) ptr->name, "timeout"))
842 {
843 const char *names[3] = {"ip", "verbose", 0};
844 std::string values[2];
845
846 mp::xml::parse_attr(ptr, names, values);
847 IP_Pattern m;
848 m.value = mp::xml::get_int(ptr, 0);
849 m.pattern = values[0];
850 m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
851 m_p->session_timeout.push_back(m);
852 }
853 else if (!strcmp((const char *) ptr->name, "connect-max"))
854 {
855 const char *names[3] = {"ip", "verbose", 0};
856 std::string values[2];
857
858 mp::xml::parse_attr(ptr, names, values);
859 IP_Pattern m;
860 m.value = mp::xml::get_int(ptr, INT_MAX);
861 m.pattern = values[0];
862 m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
863 m_p->connect_max.push_back(m);
864 }
865 else if (!strcmp((const char *) ptr->name, "connect-total"))
866 {
867 const char *names[3] = {"ip", "verbose", 0};
868 std::string values[2];
869
870 mp::xml::parse_attr(ptr, names, values);
871 IP_Pattern m;
872 m.value = mp::xml::get_int(ptr, INT_MAX);
873 m.pattern = values[0];
874 m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
875 m_p->connect_total.push_back(m);
876 }
877 else if (!strcmp((const char *) ptr->name, "http-req-max"))
878 {
879 const char *names[3] = {"ip", "verbose", 0};
880 std::string values[2];
881
882 mp::xml::parse_attr(ptr, names, values);
883 IP_Pattern m;
884 m.value = mp::xml::get_int(ptr, INT_MAX);
885 m.pattern = values[0];
886 m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
887 m_p->http_req_max.push_back(m);
888 }
889 else if (!strcmp((const char *) ptr->name, "message"))
890 {
891 m_p->m_msg_config = mp::xml::get_text(ptr);
892 }
893 else if (!strcmp((const char *) ptr->name, "stat-req"))
894 {
895 m_p->m_stat_req = mp::xml::get_text(ptr);
896 }
897 else
898 {
899 throw yf::FilterException("Bad element "
900 + std::string((const char *)
901 ptr->name));
902 }
903 }
904 if (m_p->m_msg_config.length() > 0 && m_p->m_stat_req.length() == 0)
905 { // allow stats if message is enabled for filter
906 m_p->m_stat_req = "/fn_stat";
907 }
908 if (test_only)
909 return;
910 set_ports(ports);
911}
912
913void yf::FrontendNet::set_ports(std::vector<std::string> &ports)
914{
915 std::vector<Port> nports;
916 size_t i;
917
918 for (i = 0; i < ports.size(); i++)
919 {
920 Port nport;
921
922 nport.port = ports[i];
923
924 nports.push_back(nport);
925 }
926 set_ports(nports);
927}
928
929
930void yf::FrontendNet::set_ports(std::vector<Port> &ports)
931{
932 m_p->m_ports = ports;
933
934 m_p->az = new yf::FrontendNet::ZAssocServer *[m_p->m_ports.size()];
935 m_p->pdu = new yazpp_1::PDU_Assoc *[m_p->m_ports.size()];
936
937 // Create yf::FrontendNet::ZAssocServer for each port
938 size_t i;
939 for (i = 0; i < m_p->m_ports.size(); i++)
940 m_p->az[i] = 0;
941 for (i = 0; i < m_p->m_ports.size(); i++)
942 {
943 // create a PDU assoc object (one per yf::FrontendNet::ZAssocServer)
944 yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
945
946 if (m_p->m_ports[i].cert_fname.length())
947 as->set_cert_fname(m_p->m_ports[i].cert_fname.c_str());
948 // create ZAssoc with PDU Assoc
949 m_p->pdu[i] = as;
950 m_p->az[i] = new yf::FrontendNet::ZAssocServer(
951 as, &m_p->m_ports[i], m_p.get());
952 if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
953 {
954 throw yf::FilterException("Unable to bind to address "
955 + std::string(m_p->m_ports[i].port));
956 }
957 COMSTACK cs = as->get_comstack();
958
959 if (cs && m_p->m_ports[i].max_recv_bytes)
960 cs_set_max_recv_bytes(cs, m_p->m_ports[i].max_recv_bytes);
961
962 }
963}
964
965void yf::FrontendNet::set_listen_duration(int d)
966{
967 m_p->m_listen_duration = d;
968}
969
970
971yf::FrontendNet::PeerStat::PeerStat()
972{
973 items = 0;
974}
975
976yf::FrontendNet::PeerStat::~PeerStat()
977{
978 while (items)
979 {
980 Item *n = items->next;
981 delete items;
982 items = n;
983 }
984}
985
986
987size_t yf::FrontendNet::PeerStat::add(const std::string &peer)
988{
989 Item *n = items;
990 for (; n; n = n->next)
991 {
992 if (peer == n->m_peer)
993 {
994 n->cnt++;
995 return n->cnt;
996 }
997 }
998 n = new Item();
999 n->cnt = 1;
1000 n->m_peer = peer;
1001 n->next = items;
1002 items = n;
1003 return n->cnt;
1004}
1005
1006size_t yf::FrontendNet::PeerStat::get(const std::string &peer)
1007{
1008 Item *n = items;
1009 for (; n; n = n->next)
1010 {
1011 if (peer == n->m_peer)
1012 return n->cnt;
1013 }
1014 return 0;
1015}
1016
1017size_t yf::FrontendNet::PeerStat::remove(const std::string &peer)
1018{
1019 Item **np = &items;
1020 for (; *np; np = &(*np)->next)
1021 {
1022 Item *n = *np;
1023 if (peer == n->m_peer)
1024 {
1025 if (--n->cnt == 0)
1026 {
1027 *np = n->next;
1028 delete n;
1029 return 0;
1030 }
1031 return n->cnt;
1032 }
1033 }
1034 return 0;
1035}
1036
1037static yf::Base* filter_creator()
1038{
1039 return new yf::FrontendNet;
1040}
1041
1042extern "C" {
1043 struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
1044 0,
1045 "frontend_net",
1047 };
1048}
1049
1050/*
1051 * Local variables:
1052 * c-basic-offset: 4
1053 * c-file-style: "Stroustrup"
1054 * indent-tabs-mode: nil
1055 * End:
1056 * vim: shiftwidth=4 tabstop=8 expandtab
1057 */
1058
int & read_fd() const
Definition pipe.cpp:214
yazpp_1::IPDU_Observer * sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
mp::ThreadPoolSocketObserver * m_thread_pool_observer
void set_thread_pool(ThreadPoolSocketObserver *observer)
yazpp_1::IPDU_Observer * sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
void set_ports(std::vector< Port > &ports)
set ports
static mp::filter::Base * filter_creator()
struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net