Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Daniel Nachbaur <daniel.nachbaur@epfl.ch>
4 : * Stefan.Eilemann@epfl.ch
5 : */
6 :
7 : #include "subscriber.h"
8 :
9 : #include "detail/byteswap.h"
10 : #include "detail/common.h"
11 : #include "detail/constants.h"
12 : #include "detail/receiver.h"
13 : #include "detail/sender.h"
14 : #include "detail/socket.h"
15 : #include "log.h"
16 :
17 : #include <servus/serializable.h>
18 : #include <servus/servus.h>
19 :
20 : #include <cassert>
21 : #include <cstring>
22 : #include <map>
23 : #include <stdexcept>
24 :
25 : namespace zeroeq
26 : {
27 80 : class Subscriber::Impl : public detail::Receiver
28 : {
29 : public:
30 28 : Impl(const std::string& session)
31 28 : : detail::Receiver(PUBLISHER_SERVICE, session == DEFAULT_SESSION
32 56 : ? getDefaultPubSession()
33 : : session)
34 52 : , _selfInstance(detail::Sender::getUUID())
35 : {
36 24 : update();
37 24 : }
38 :
39 19 : Impl(const URIs& uris)
40 19 : : detail::Receiver(PUBLISHER_SERVICE)
41 22 : , _selfInstance(detail::Sender::getUUID())
42 : {
43 36 : for (const URI& uri : uris)
44 : {
45 20 : if (!uri.isFullyQualified())
46 2 : ZEROEQTHROW(std::runtime_error(std::string(
47 : "Non-fully qualified URI used for subscriber")));
48 :
49 36 : const std::string& zmqURI = buildZmqURI(uri);
50 18 : if (!addConnection(zmqURI))
51 : {
52 1 : ZEROEQTHROW(std::runtime_error("Cannot connect subscriber to " +
53 : zmqURI + ": " +
54 : zmq_strerror(zmq_errno())));
55 : }
56 : }
57 16 : }
58 :
59 7 : bool subscribe(servus::Serializable& serializable)
60 : {
61 1 : const auto func = [&serializable](const void* data, const size_t size) {
62 1 : serializable.fromBinary(data, size);
63 8 : };
64 7 : return subscribe(serializable.getTypeIdentifier(), func);
65 : }
66 :
67 30 : bool subscribe(const uint128_t& event, const EventPayloadFunc& func)
68 : {
69 30 : if (_eventFuncs.count(event) != 0)
70 2 : return false;
71 :
72 28 : _subscribe(event);
73 28 : _eventFuncs[event] = func;
74 28 : return true;
75 : }
76 :
77 5 : bool unsubscribe(const servus::Serializable& serializable)
78 : {
79 5 : return unsubscribe(serializable.getTypeIdentifier());
80 : }
81 :
82 8 : bool unsubscribe(const uint128_t& event)
83 : {
84 8 : if (_eventFuncs.erase(event) == 0)
85 3 : return false;
86 :
87 5 : _unsubscribe(event);
88 5 : return true;
89 : }
90 :
91 19993 : bool process(detail::Socket& socket)
92 : {
93 : zmq_msg_t msg;
94 19993 : zmq_msg_init(&msg);
95 19993 : zmq_msg_recv(&msg, socket.socket, 0);
96 :
97 19993 : uint128_t type;
98 19993 : memcpy(&type, zmq_msg_data(&msg), sizeof(type));
99 : #ifndef ZEROEQ_LITTLEENDIAN
100 : detail::byteswap(type); // convert from little endian wire
101 : #endif
102 19993 : const bool payload = zmq_msg_more(&msg);
103 19993 : zmq_msg_close(&msg);
104 :
105 19993 : if (payload)
106 : {
107 19989 : zmq_msg_init(&msg);
108 19989 : zmq_msg_recv(&msg, socket.socket, 0);
109 : }
110 :
111 19993 : EventFuncMap::const_iterator i = _eventFuncs.find(type);
112 19993 : if (i == _eventFuncs.cend())
113 : {
114 0 : if (payload)
115 0 : zmq_msg_close(&msg);
116 :
117 0 : ZEROEQTHROW(std::runtime_error("Got unsubscribed event " +
118 : type.getString()));
119 : }
120 :
121 19993 : if (payload)
122 : {
123 19989 : i->second(zmq_msg_data(&msg), zmq_msg_size(&msg));
124 19989 : zmq_msg_close(&msg);
125 : }
126 : else
127 4 : i->second(nullptr, 0);
128 19993 : return true;
129 : }
130 :
131 26 : zmq::SocketPtr createSocket(const uint128_t& instance)
132 : {
133 26 : if (instance == _selfInstance)
134 2 : return {};
135 :
136 : zmq::SocketPtr socket(zmq_socket(getContext(), ZMQ_SUB),
137 72 : [](void* s) { ::zmq_close(s); });
138 24 : const int hwm = 0;
139 24 : zmq_setsockopt(socket.get(), ZMQ_RCVHWM, &hwm, sizeof(hwm));
140 :
141 : // Tell a Monitor on a Publisher we're here
142 24 : if (zmq_setsockopt(socket.get(), ZMQ_SUBSCRIBE, &MEERKAT,
143 : sizeof(uint128_t)) == -1)
144 : {
145 0 : ZEROEQTHROW(std::runtime_error(
146 : std::string("Cannot update meerkat filter: ") +
147 : zmq_strerror(zmq_errno())));
148 : }
149 :
150 : // Add existing subscriptions to socket
151 28 : for (const auto& i : _eventFuncs)
152 : {
153 4 : if (zmq_setsockopt(socket.get(), ZMQ_SUBSCRIBE, &i.first,
154 : sizeof(uint128_t)) == -1)
155 : {
156 0 : ZEROEQTHROW(std::runtime_error(
157 : std::string("Cannot update topic filter: ") +
158 : zmq_strerror(zmq_errno())));
159 : }
160 : }
161 24 : return socket;
162 : }
163 :
164 : private:
165 : typedef std::map<uint128_t, EventPayloadFunc> EventFuncMap;
166 : EventFuncMap _eventFuncs;
167 :
168 : const uint128_t _selfInstance;
169 :
170 28 : void _subscribe(const uint128_t& event)
171 : {
172 44 : for (const auto& socket : getSockets())
173 : {
174 16 : if (zmq_setsockopt(socket.second.get(), ZMQ_SUBSCRIBE, &event,
175 : sizeof(event)) == -1)
176 : {
177 0 : ZEROEQTHROW(std::runtime_error(
178 : std::string("Cannot update topic filter: ") +
179 : zmq_strerror(zmq_errno())));
180 : }
181 : }
182 28 : }
183 :
184 5 : void _unsubscribe(const uint128_t& event)
185 : {
186 6 : for (const auto& socket : getSockets())
187 : {
188 1 : if (zmq_setsockopt(socket.second.get(), ZMQ_UNSUBSCRIBE, &event,
189 : sizeof(event)) == -1)
190 : {
191 0 : ZEROEQTHROW(std::runtime_error(
192 : std::string("Cannot update topic filter: ") +
193 : zmq_strerror(zmq_errno())));
194 : }
195 : }
196 5 : }
197 : };
198 :
199 12 : Subscriber::Subscriber()
200 : : Receiver()
201 12 : , _impl(new Impl(DEFAULT_SESSION))
202 : {
203 12 : }
204 :
205 11 : Subscriber::Subscriber(const std::string& session)
206 : : Receiver()
207 13 : , _impl(new Impl(session))
208 : {
209 9 : }
210 :
211 15 : Subscriber::Subscriber(const URIs& uris)
212 : : Receiver()
213 17 : , _impl(new Impl(uris))
214 : {
215 13 : }
216 :
217 2 : Subscriber::Subscriber(Receiver& shared)
218 : : Receiver(shared)
219 2 : , _impl(new Impl(DEFAULT_SESSION))
220 : {
221 2 : }
222 :
223 3 : Subscriber::Subscriber(const std::string& session, Receiver& shared)
224 : : Receiver(shared)
225 5 : , _impl(new Impl(session))
226 : {
227 1 : }
228 :
229 4 : Subscriber::Subscriber(const URIs& uris, Receiver& shared)
230 : : Receiver(shared)
231 5 : , _impl(new Impl(uris))
232 : {
233 3 : }
234 :
235 41 : Subscriber::~Subscriber()
236 : {
237 41 : }
238 :
239 7 : bool Subscriber::subscribe(servus::Serializable& serializable)
240 : {
241 7 : return _impl->subscribe(serializable);
242 : }
243 :
244 9 : bool Subscriber::subscribe(const uint128_t& event, const EventFunc& func)
245 : {
246 66 : return _impl->subscribe(event, [func](const void*, size_t) { func(); });
247 : }
248 :
249 14 : bool Subscriber::subscribe(const uint128_t& event, const EventPayloadFunc& func)
250 : {
251 14 : return _impl->subscribe(event, func);
252 : }
253 :
254 5 : bool Subscriber::unsubscribe(const servus::Serializable& serializable)
255 : {
256 5 : return _impl->unsubscribe(serializable);
257 : }
258 :
259 3 : bool Subscriber::unsubscribe(const uint128_t& event)
260 : {
261 3 : return _impl->unsubscribe(event);
262 : }
263 :
264 2 : const std::string& Subscriber::getSession() const
265 : {
266 2 : return _impl->getSession();
267 : }
268 :
269 325248 : void Subscriber::addSockets(std::vector<detail::Socket>& entries)
270 : {
271 325248 : _impl->addSockets(entries);
272 325248 : }
273 :
274 19993 : bool Subscriber::process(detail::Socket& socket)
275 : {
276 19993 : return _impl->process(socket);
277 : }
278 :
279 325248 : void Subscriber::update()
280 : {
281 325248 : _impl->update();
282 325248 : }
283 :
284 2 : void Subscriber::addConnection(const std::string& uri)
285 : {
286 2 : _impl->addConnection(uri);
287 2 : }
288 24 : }
|