Line data Source code
1 :
2 : /* Copyright (c) 2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "server.h"
7 :
8 : #include "detail/receiver.h"
9 : #include "detail/sender.h"
10 :
11 : #include <zmq.h>
12 :
13 : #include <cassert>
14 : #include <unordered_map>
15 :
16 : namespace zeroeq
17 : {
18 : class Server::Impl : public detail::Sender
19 : {
20 : public:
21 14 : Impl(const URI& uri_, const std::string& session)
22 14 : : detail::Sender(uri_, ZMQ_REP, SERVER_SERVICE,
23 28 : session == DEFAULT_SESSION ? getDefaultRepSession()
24 30 : : session)
25 : {
26 14 : if (session.empty())
27 1 : ZEROEQTHROW(
28 : std::runtime_error("Empty session is not allowed for server"));
29 :
30 26 : const std::string& zmqURI = buildZmqURI(uri);
31 13 : if (zmq_bind(socket.get(), zmqURI.c_str()) == -1)
32 1 : ZEROEQTHROW(
33 : std::runtime_error(std::string("Cannot bind server socket '") +
34 : zmqURI + "': " + zmq_strerror(zmq_errno())));
35 12 : initURI();
36 12 : if (session != NULL_SESSION)
37 0 : announce();
38 12 : }
39 :
40 12 : ~Impl() {}
41 :
42 42 : bool handle(const uint128_t& request, const HandleFunc& func)
43 : {
44 42 : if (_handlers.find(request) != _handlers.end())
45 14 : return false;
46 :
47 28 : _handlers[request] = func;
48 28 : return true;
49 : }
50 :
51 42 : bool remove(const uint128_t& request)
52 : {
53 42 : return _handlers.erase(request) > 0;
54 : }
55 :
56 14 : bool process(detail::Socket&)
57 : {
58 14 : uint128_t requestID;
59 14 : const bool payload = _recv(&requestID, sizeof(requestID));
60 :
61 : #ifdef ZEROEQ_BIGENDIAN
62 : detail::byteswap(requestID); // convert from little endian wire protocol
63 : #endif
64 :
65 : zmq_msg_t msg;
66 14 : if (payload)
67 : {
68 10 : zmq_msg_init(&msg);
69 10 : zmq_msg_recv(&msg, socket.get(), 0);
70 : }
71 :
72 14 : auto i = _handlers.find(requestID);
73 14 : if (i == _handlers.cend()) // no handler, return "0"
74 : {
75 1 : const uint128_t zero;
76 1 : _send(&zero, sizeof(zero), 0); // request and reply, no playload
77 : }
78 : else
79 : {
80 : try
81 : {
82 : auto reply =
83 10 : payload ? i->second(zmq_msg_data(&msg), zmq_msg_size(&msg))
84 35 : : i->second(nullptr, 0);
85 12 : const bool hasReplyData = reply.second.ptr && reply.second.size;
86 : #ifdef ZEROEQ_BIGENDIAN
87 : detail::byteswap(reply.first); // convert to little endian
88 : #endif
89 24 : if (_send(&reply.first, sizeof(reply.first),
90 12 : hasReplyData ? ZMQ_SNDMORE : 0) &&
91 : hasReplyData)
92 : {
93 11 : _send(reply.second.ptr.get(), reply.second.size, 0);
94 : }
95 : }
96 2 : catch (...) // handler had exception
97 : {
98 1 : const uint128_t zero;
99 1 : _send(&zero, sizeof(zero), 0); // request and reply, no playload
100 : }
101 : }
102 :
103 14 : if (payload)
104 10 : zmq_msg_close(&msg);
105 14 : return true;
106 : }
107 :
108 : private:
109 25 : bool _send(const void* data, const size_t size, const int flags)
110 : {
111 : zmq_msg_t msg;
112 25 : zmq_msg_init_size(&msg, size);
113 25 : ::memcpy(zmq_msg_data(&msg), data, size);
114 25 : int ret = zmq_msg_send(&msg, socket.get(), flags);
115 25 : zmq_msg_close(&msg);
116 :
117 25 : if (ret != -1)
118 25 : return true;
119 :
120 0 : ZEROEQWARN << "Cannot send reply: " << zmq_strerror(zmq_errno())
121 0 : << std::endl;
122 0 : return false;
123 : }
124 :
125 : /** @return true if more data available */
126 14 : bool _recv(void* data, const size_t size)
127 : {
128 : zmq_msg_t msg;
129 14 : zmq_msg_init(&msg);
130 14 : zmq_msg_recv(&msg, socket.get(), 0);
131 14 : if (zmq_msg_size(&msg) != size)
132 : {
133 0 : ZEROEQTHROW(std::runtime_error(
134 : std::string("Message size mismatch, expected ") +
135 : std::to_string(size) + " got " +
136 : std::to_string(zmq_msg_size(&msg))));
137 : }
138 : else
139 14 : memcpy(data, zmq_msg_data(&msg), size);
140 14 : const bool more = zmq_msg_more(&msg);
141 14 : zmq_msg_close(&msg);
142 14 : return more;
143 : }
144 :
145 : std::unordered_map<uint128_t, HandleFunc> _handlers;
146 : };
147 :
148 0 : Server::Server()
149 : : Receiver()
150 0 : , _impl(new Impl({}, DEFAULT_SESSION))
151 : {
152 0 : }
153 :
154 12 : Server::Server(const std::string& session)
155 : : Receiver()
156 13 : , _impl(new Impl({}, session))
157 : {
158 11 : }
159 :
160 1 : Server::Server(const URI& uri)
161 : : Receiver()
162 2 : , _impl(new Impl(uri, DEFAULT_SESSION))
163 : {
164 0 : }
165 :
166 1 : Server::Server(const URI& uri, const std::string& session)
167 : : Receiver()
168 1 : , _impl(new Impl(uri, session))
169 : {
170 1 : }
171 :
172 0 : Server::Server(Receiver& shared)
173 : : Receiver(shared)
174 0 : , _impl(new Impl({}, DEFAULT_SESSION))
175 : {
176 0 : }
177 :
178 0 : Server::Server(const std::string& session, Receiver& shared)
179 : : Receiver(shared)
180 0 : , _impl(new Impl({}, session))
181 : {
182 0 : }
183 :
184 0 : Server::Server(const URI& uri, Receiver& shared)
185 : : Receiver(shared)
186 0 : , _impl(new Impl(uri, DEFAULT_SESSION))
187 : {
188 0 : }
189 :
190 0 : Server::Server(const URI& uri, const std::string& session, Receiver& shared)
191 : : Receiver(shared)
192 0 : , _impl(new Impl(uri, session))
193 : {
194 0 : }
195 :
196 12 : Server::~Server()
197 : {
198 12 : }
199 :
200 : Server::Server(Server&&) = default;
201 : Server& Server::operator=(Server&&) = default;
202 :
203 0 : const std::string& Server::getSession() const
204 : {
205 0 : return _impl->getSession();
206 : }
207 :
208 417 : void Server::addSockets(std::vector<detail::Socket>& entries)
209 : {
210 417 : _impl->addSockets(entries);
211 419 : }
212 :
213 14 : bool Server::process(detail::Socket& socket)
214 : {
215 14 : return _impl->process(socket);
216 : }
217 :
218 0 : void Server::addConnection(const std::string&)
219 : {
220 0 : ZEROEQTHROW(std::runtime_error("Server cannot add connections"));
221 : }
222 :
223 16 : const URI& Server::getURI() const
224 : {
225 16 : return _impl->uri;
226 : }
227 :
228 42 : bool Server::handle(const uint128_t& request, const HandleFunc& func)
229 : {
230 42 : return _impl->handle(request, func);
231 : }
232 :
233 42 : bool Server::remove(const uint128_t& request)
234 : {
235 42 : return _impl->remove(request);
236 : }
237 :
238 0 : zmq::SocketPtr Server::getSocket()
239 : {
240 0 : return _impl->socket;
241 : }
242 24 : }
|