Line data Source code
1 :
2 : /* Copyright (c) 2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "client.h"
7 :
8 : #include "detail/common.h"
9 : #include "detail/receiver.h"
10 :
11 : #include <servus/servus.h>
12 : #include <thread>
13 : #include <unordered_map>
14 :
15 : namespace zeroeq
16 : {
17 : class Client::Impl : public detail::Receiver
18 : {
19 : public:
20 1 : explicit Impl(const std::string& session)
21 1 : : detail::Receiver(SERVER_SERVICE, session == DEFAULT_SESSION
22 2 : ? getDefaultRepSession()
23 : : session)
24 : , _servers(zmq_socket(getContext(), ZMQ_DEALER),
25 3 : [](void* s) { ::zmq_close(s); })
26 : {
27 1 : const char* serversEnv = getenv("ZEROEQ_SERVERS");
28 1 : if (!serversEnv)
29 0 : return;
30 :
31 2 : std::string servers(serversEnv);
32 5 : while (!servers.empty())
33 : {
34 2 : const size_t pos = servers.find(',');
35 4 : const std::string server = servers.substr(0, pos);
36 4 : servers = pos == std::string::npos ? std::string()
37 2 : : servers.substr(pos + 1);
38 :
39 4 : const auto& zmqURI = buildZmqURI(URI(server));
40 2 : if (!addConnection(zmqURI))
41 0 : ZEROEQTHROW(std::runtime_error("Cannot connect client to " +
42 : zmqURI + ": " +
43 : zmq_strerror(zmq_errno())));
44 : }
45 :
46 1 : update();
47 : }
48 :
49 11 : explicit Impl(const URIs& uris)
50 11 : : detail::Receiver(SERVER_SERVICE)
51 : , _servers(zmq_socket(getContext(), ZMQ_DEALER),
52 22 : [](void* s) { ::zmq_close(s); })
53 : {
54 23 : for (const auto& uri : uris)
55 : {
56 12 : if (!uri.isFullyQualified())
57 0 : ZEROEQTHROW(std::runtime_error(
58 : std::string("Non-fully qualified URI used for server")));
59 :
60 24 : const auto& zmqURI = buildZmqURI(uri);
61 12 : if (!addConnection(zmqURI))
62 0 : ZEROEQTHROW(std::runtime_error("Cannot connect client to " +
63 : zmqURI + ": " +
64 : zmq_strerror(zmq_errno())));
65 : }
66 11 : }
67 :
68 24 : ~Impl() {}
69 :
70 14 : zmq::SocketPtr createSocket(const uint128_t&) final { return _servers; }
71 :
72 14 : bool request(uint128_t requestID, const void* data, const size_t size,
73 : const ReplyFunc& func)
74 : {
75 14 : const bool hasPayload = data && size > 0;
76 14 : ++_id;
77 : #ifdef ZEROEQ_BIGENDIAN
78 : detail::byteswap(requestID); // convert to little endian wire protocol
79 : #endif
80 :
81 42 : if (!_send(&_id, sizeof(_id), ZMQ_SNDMORE) ||
82 28 : !_send(nullptr, 0, ZMQ_SNDMORE) || // frame delimiter
83 14 : !_send(&requestID, sizeof(requestID), hasPayload ? ZMQ_SNDMORE : 0))
84 : {
85 0 : return false;
86 : }
87 :
88 14 : if (hasPayload && !_send(data, size, 0))
89 0 : return false;
90 :
91 14 : _handlers[_id] = func;
92 14 : return true;
93 : }
94 :
95 14 : bool process(detail::Socket& socket)
96 : {
97 : uint64_t id;
98 14 : uint128_t replyID;
99 :
100 14 : if (!_recv(&id, sizeof(id), ZMQ_DONTWAIT) || !_recv(nullptr, 0, 0))
101 0 : return false;
102 14 : const bool payload = _recv(&replyID, sizeof(replyID), 0);
103 :
104 : #ifdef ZEROEQ_BIGENDIAN
105 : detail::byteswap(replyID); // convert to little endian wire protocol
106 : #endif
107 :
108 : zmq_msg_t msg;
109 14 : if (payload)
110 : {
111 11 : zmq_msg_init(&msg);
112 11 : zmq_msg_recv(&msg, socket.socket, 0);
113 : }
114 :
115 14 : auto i = _handlers.find(id);
116 14 : if (i == _handlers.cend())
117 : {
118 0 : if (payload)
119 0 : zmq_msg_close(&msg);
120 :
121 0 : ZEROEQTHROW(std::runtime_error("Got unrequested reply " +
122 : std::to_string(id)));
123 : }
124 :
125 14 : if (payload)
126 : {
127 11 : i->second(replyID, zmq_msg_data(&msg), zmq_msg_size(&msg));
128 11 : zmq_msg_close(&msg);
129 : }
130 : else
131 3 : i->second(replyID, nullptr, 0);
132 14 : _handlers.erase(i);
133 14 : return true;
134 : }
135 :
136 : private:
137 52 : bool _send(const void* data, const size_t size, int flags)
138 : {
139 : zmq_msg_t msg;
140 52 : zmq_msg_init_size(&msg, size);
141 52 : if (data)
142 38 : ::memcpy(zmq_msg_data(&msg), data, size);
143 :
144 52 : flags |= ZMQ_DONTWAIT;
145 : while (true)
146 : {
147 52 : const int ret = zmq_msg_send(&msg, _servers.get(), flags);
148 52 : if (ret == -1 && zmq_errno() == EAGAIN)
149 : {
150 0 : if (!update())
151 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
152 : }
153 : else
154 : {
155 52 : zmq_msg_close(&msg);
156 :
157 52 : if (ret != -1)
158 52 : return true;
159 :
160 : ZEROEQWARN << "Cannot send request: "
161 0 : << zmq_strerror(zmq_errno()) << std::endl;
162 0 : return false;
163 : }
164 0 : }
165 : }
166 :
167 : /** @return true if more data available */
168 42 : bool _recv(void* data, const size_t size, const int flags)
169 : {
170 : zmq_msg_t msg;
171 42 : zmq_msg_init(&msg);
172 42 : if (zmq_msg_recv(&msg, _servers.get(), flags) == -1)
173 0 : return false;
174 :
175 42 : if (zmq_msg_size(&msg) != size)
176 0 : ZEROEQWARN << "Reply size mismatch, expected " << size << " got "
177 0 : << zmq_msg_size(&msg) << std::endl;
178 42 : else if (data)
179 28 : ::memcpy(data, zmq_msg_data(&msg), size);
180 42 : const bool more = zmq_msg_more(&msg);
181 42 : zmq_msg_close(&msg);
182 42 : return more;
183 : }
184 :
185 : zmq::SocketPtr _servers;
186 : std::unordered_map<uint64_t, ReplyFunc> _handlers;
187 : uint64_t _id{0};
188 : };
189 :
190 1 : Client::Client()
191 : : Receiver()
192 1 : , _impl(new Impl(DEFAULT_SESSION))
193 : {
194 1 : }
195 :
196 0 : Client::Client(const std::string& session)
197 : : Receiver()
198 0 : , _impl(new Impl(session))
199 : {
200 0 : }
201 :
202 10 : Client::Client(const URIs& uris)
203 : : Receiver()
204 10 : , _impl(new Impl(uris))
205 : {
206 10 : }
207 :
208 0 : Client::Client(Receiver& shared)
209 : : Receiver(shared)
210 0 : , _impl(new Impl(DEFAULT_SESSION))
211 : {
212 0 : }
213 :
214 0 : Client::Client(const std::string& session, Receiver& shared)
215 : : Receiver(shared)
216 0 : , _impl(new Impl(session))
217 : {
218 0 : }
219 :
220 1 : Client::Client(const URIs& uris, Receiver& shared)
221 : : Receiver(shared)
222 1 : , _impl(new Impl(uris))
223 : {
224 1 : }
225 :
226 12 : Client::~Client()
227 : {
228 12 : }
229 :
230 11 : bool Client::request(const servus::Serializable& req, const ReplyFunc& func)
231 : {
232 22 : const auto& data = req.toBinary();
233 22 : return request(req.getTypeIdentifier(), data.ptr.get(), data.size, func);
234 : }
235 :
236 14 : bool Client::request(const uint128_t& requestID, const void* data,
237 : const size_t size, const ReplyFunc& func)
238 : {
239 14 : return _impl->request(requestID, data, size, func);
240 : }
241 :
242 0 : const std::string& Client::getSession() const
243 : {
244 0 : return _impl->getSession();
245 : }
246 :
247 55422 : void Client::addSockets(std::vector<detail::Socket>& entries)
248 : {
249 55422 : _impl->addSockets(entries);
250 55422 : }
251 :
252 14 : bool Client::process(detail::Socket& socket)
253 : {
254 14 : return _impl->process(socket);
255 : }
256 :
257 55422 : void Client::update()
258 : {
259 55422 : _impl->update();
260 55422 : }
261 :
262 0 : void Client::addConnection(const std::string& uri)
263 : {
264 0 : _impl->addConnection(uri);
265 0 : }
266 24 : }
|