Line data Source code
1 :
2 : /* Copyright (c) 2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "monitor.h"
7 :
8 : #include "detail/constants.h"
9 : #include "detail/context.h"
10 : #include "detail/socket.h"
11 : #include "log.h"
12 : #include "publisher.h"
13 :
14 : #include <zmq.h>
15 :
16 : namespace zeroeq
17 : {
18 : class Monitor::Impl
19 : {
20 : public:
21 2 : Impl() {}
22 2 : virtual ~Impl() {}
23 2 : void addSockets(std::vector<zeroeq::detail::Socket>& entries)
24 : {
25 : zeroeq::detail::Socket entry;
26 2 : entry.socket = _socket.get();
27 2 : entry.events = ZMQ_POLLIN;
28 2 : entries.push_back(entry);
29 2 : }
30 :
31 : virtual bool process(void* socket, Monitor& monitor) = 0;
32 :
33 : protected:
34 : zmq::SocketPtr _socket;
35 : };
36 :
37 : namespace
38 : {
39 : class XPubImpl : public Monitor::Impl
40 : {
41 : public:
42 2 : XPubImpl(Sender& sender)
43 2 : {
44 2 : _socket = sender.getSocket();
45 :
46 2 : const int on = 1;
47 2 : if (zmq_setsockopt(_socket.get(), ZMQ_XPUB_VERBOSE, &on, sizeof(on)) ==
48 : -1)
49 : {
50 0 : ZEROEQTHROW(std::runtime_error(
51 : std::string("Enabling ZMQ_XPUB_VERBOSE failed: ") +
52 : zmq_strerror(zmq_errno())));
53 : }
54 2 : }
55 :
56 4 : ~XPubImpl()
57 4 : {
58 2 : const int off = 0;
59 2 : zmq_setsockopt(_socket.get(), ZMQ_XPUB_VERBOSE, &off, sizeof(off));
60 4 : }
61 :
62 2 : bool process(void* socket, Monitor& monitor)
63 : {
64 : // Message event is one byte 0=unsub or 1=sub, followed by topic
65 : zmq_msg_t msg;
66 2 : zmq_msg_init(&msg);
67 2 : if (zmq_msg_recv(&msg, socket, 0) == -1)
68 0 : return false;
69 :
70 2 : const uint8_t* data = (const uint8_t*)zmq_msg_data(&msg);
71 2 : switch (*data)
72 : {
73 : case 0:
74 0 : break; // unsub
75 :
76 : case 1: // sub
77 4 : if (zmq_msg_size(&msg) == sizeof(uint8_t) + sizeof(uint128_t) &&
78 2 : *(const uint128_t*)(data + 1) == MEERKAT) // new subscriber
79 : {
80 2 : monitor.notifyNewConnection();
81 2 : return true;
82 : }
83 0 : break;
84 :
85 : default:
86 0 : ZEROEQWARN << "Unhandled monitor event" << std::endl;
87 : }
88 0 : zmq_msg_close(&msg);
89 0 : return false;
90 : }
91 : };
92 :
93 : class SocketImpl : public Monitor::Impl
94 : {
95 : public:
96 0 : SocketImpl(Sender& sender)
97 0 : : _context(detail::getContext())
98 : {
99 : #if (ZMQ_VERSION < 40104)
100 : ZEROEQTHROW(std::runtime_error(
101 : "ZeroEQ version with bug in socket monitor, need at least 4.1.4"));
102 : #endif
103 0 : const auto inproc = std::string("inproc://zeroeq.monitor.") +
104 0 : servus::make_UUID().getString();
105 :
106 0 : if (::zmq_socket_monitor(sender.getSocket().get(), inproc.c_str(),
107 : ZMQ_EVENT_ALL) != 0)
108 : {
109 0 : ZEROEQTHROW(
110 : std::runtime_error(std::string("Cannot monitor socket: ") +
111 : zmq_strerror(zmq_errno())));
112 : }
113 :
114 0 : _socket.reset(::zmq_socket(_context.get(), ZMQ_PAIR),
115 0 : [](void* s) { ::zmq_close(s); });
116 0 : if (!_socket)
117 0 : ZEROEQTHROW(std::runtime_error(
118 : std::string("Cannot create inproc socket: ") +
119 : zmq_strerror(zmq_errno())));
120 :
121 0 : if (::zmq_connect(_socket.get(), inproc.c_str()) != 0)
122 : {
123 0 : ZEROEQTHROW(std::runtime_error(
124 : std::string("Cannot connect inproc socket: ") +
125 : zmq_strerror(zmq_errno())));
126 : }
127 0 : }
128 :
129 0 : ~SocketImpl() {}
130 0 : bool process(void* socket, Monitor& monitor)
131 : {
132 : // Messages consist of 2 Frames, the first containing the event-id and
133 : // the associated value. The second frame holds the affected endpoint as
134 : // string.
135 : zmq_msg_t msg;
136 0 : zmq_msg_init(&msg);
137 :
138 : // The layout of the first Frame is: 16 bit event id 32 bit event value
139 0 : if (zmq_msg_recv(&msg, socket, 0) == -1)
140 : {
141 0 : ZEROEQWARN << "Can't read event id from monitor socket"
142 0 : << std::endl;
143 0 : return false;
144 : }
145 0 : const uint16_t event = *(uint16_t*)zmq_msg_data(&msg);
146 : // Ignore event value
147 :
148 0 : if (zmq_msg_more(&msg))
149 : {
150 0 : zmq_msg_close(&msg);
151 :
152 : // Second frame in message contains event address, skip
153 0 : zmq_msg_init(&msg);
154 0 : if (zmq_msg_recv(&msg, socket, 0) == -1)
155 0 : ZEROEQWARN << "Can't read address from monitor socket"
156 0 : << std::endl;
157 : }
158 : else
159 0 : ZEROEQWARN << "Monitor event has no event address" << std::endl;
160 :
161 0 : zmq_msg_close(&msg);
162 :
163 0 : switch (event)
164 : {
165 : case ZMQ_EVENT_CONNECTED:
166 : case ZMQ_EVENT_ACCEPTED:
167 0 : monitor.notifyNewConnection();
168 0 : return true;
169 :
170 : default:
171 0 : ZEROEQWARN << "Unhandled monitor event " << event << std::endl;
172 : }
173 0 : return false;
174 : }
175 :
176 : private:
177 : zmq::ContextPtr _context;
178 : };
179 :
180 2 : Monitor::Impl* newImpl(Sender& sender)
181 : {
182 2 : if (dynamic_cast<Publisher*>(&sender))
183 2 : return new XPubImpl(sender);
184 0 : return new SocketImpl(sender);
185 : }
186 : }
187 :
188 1 : Monitor::Monitor(Sender& sender)
189 : : Receiver()
190 1 : , _impl(newImpl(sender))
191 : {
192 1 : }
193 :
194 1 : Monitor::Monitor(Sender& sender, Receiver& shared)
195 : : Receiver(shared)
196 1 : , _impl(newImpl(sender))
197 : {
198 1 : }
199 :
200 2 : Monitor::~Monitor()
201 : {
202 2 : }
203 :
204 2 : void Monitor::addSockets(std::vector<zeroeq::detail::Socket>& entries)
205 : {
206 2 : _impl->addSockets(entries);
207 2 : }
208 :
209 2 : bool Monitor::process(zeroeq::detail::Socket& socket)
210 : {
211 2 : return _impl->process(socket.socket, *this);
212 : }
213 24 : }
|