Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "broker.h"
7 :
8 : #include <zeroeq/detail/port.h>
9 : #include <zeroeq/detail/sender.h>
10 : #include <zeroeq/detail/socket.h>
11 : #include <zeroeq/log.h>
12 : #include <zeroeq/receiver.h>
13 :
14 : #include <cassert>
15 : #include <map>
16 :
17 : namespace zeroeq
18 : {
19 : namespace connection
20 : {
21 : namespace detail
22 : {
23 : class Broker : public zeroeq::detail::Sender
24 : {
25 : public:
26 2 : Broker(const std::string& name, Receiver& receiver,
27 : const connection::Broker::PortSelection mode)
28 6 : : Sender(URI(std::string("tcp://*:") +
29 4 : std::to_string(uint32_t(zeroeq::detail::getPort(name)))),
30 : ZMQ_REP)
31 4 : , _receiver(receiver)
32 : {
33 2 : if (!_listen(mode))
34 : {
35 0 : uri = URI("tcp://*:0");
36 0 : _listen(connection::Broker::PORT_FIXED);
37 : }
38 2 : initURI();
39 2 : }
40 :
41 2 : Broker(Receiver& receiver, const std::string& address)
42 4 : : Sender(URI(std::string("tcp://") + address), ZMQ_REP)
43 5 : , _receiver(receiver)
44 : {
45 2 : _listen(connection::Broker::PORT_FIXED);
46 1 : initURI();
47 1 : }
48 :
49 3 : ~Broker() {}
50 8757 : void addSockets(std::vector<zeroeq::detail::Socket>& entries)
51 : {
52 8757 : assert(socket);
53 8757 : if (!socket)
54 0 : return;
55 :
56 : zeroeq::detail::Socket entry;
57 8757 : entry.socket = socket.get();
58 8757 : entry.events = ZMQ_POLLIN;
59 8757 : entries.push_back(entry);
60 : }
61 :
62 2 : bool process(zeroeq::detail::Socket& socket_)
63 : {
64 : zmq_msg_t msg;
65 2 : zmq_msg_init(&msg);
66 2 : zmq_msg_recv(&msg, socket_.socket, 0);
67 2 : const std::string address((const char*)zmq_msg_data(&msg),
68 6 : zmq_msg_size(&msg));
69 :
70 2 : _receiver.addConnection(std::string("tcp://") + address);
71 2 : zmq_msg_send(&msg, socket_.socket, 0);
72 2 : zmq_msg_close(&msg);
73 4 : return true;
74 : }
75 :
76 : private:
77 : zeroeq::Receiver& _receiver;
78 :
79 4 : bool _listen(const connection::Broker::PortSelection mode)
80 : {
81 : const std::string address =
82 8 : std::to_string(uri) + (uri.getPort() ? "" : ":0");
83 4 : if (zmq_bind(socket.get(), address.c_str()) == -1)
84 : {
85 1 : if (mode == connection::Broker::PORT_FIXED)
86 1 : ZEROEQTHROW(std::runtime_error("Cannot connect broker to " +
87 : address + ": " +
88 : zmq_strerror(zmq_errno())));
89 :
90 0 : return false;
91 : }
92 :
93 3 : ZEROEQINFO << "Bound broker to " << address << std::endl;
94 3 : return true;
95 : }
96 : };
97 : }
98 :
99 2 : Broker::Broker(const std::string& name, Receiver& receiver,
100 2 : const PortSelection mode)
101 : : Receiver(receiver)
102 2 : , _impl(new detail::Broker(name, receiver, mode))
103 : {
104 2 : }
105 :
106 2 : Broker::Broker(const std::string& address, Receiver& receiver)
107 : : Receiver(receiver)
108 3 : , _impl(new detail::Broker(receiver, address))
109 : {
110 1 : }
111 :
112 9 : Broker::~Broker()
113 : {
114 3 : delete _impl;
115 6 : }
116 :
117 8757 : void Broker::addSockets(std::vector<zeroeq::detail::Socket>& entries)
118 : {
119 8757 : _impl->addSockets(entries);
120 8757 : }
121 :
122 2 : bool Broker::process(zeroeq::detail::Socket& socket)
123 : {
124 2 : return _impl->process(socket);
125 : }
126 :
127 3 : std::string Broker::getAddress() const
128 : {
129 3 : return _impl->getAddress();
130 : }
131 : }
132 24 : }
|