Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "service.h"
7 : #include <zeroeq/detail/context.h>
8 : #include <zeroeq/detail/port.h>
9 : #include <zeroeq/log.h>
10 : #include <zeroeq/publisher.h>
11 :
12 : #include <zmq.h>
13 :
14 : #include <string.h>
15 :
16 : namespace zeroeq
17 : {
18 : namespace connection
19 : {
20 2 : bool Service::subscribe(const std::string& address, const Publisher& publisher)
21 : {
22 4 : zmq::ContextPtr context = detail::getContext();
23 2 : void* socket = zmq_socket(context.get(), ZMQ_REQ);
24 2 : if (!socket)
25 : {
26 0 : ZEROEQINFO << "Can't create socket: " << zmq_strerror(zmq_errno())
27 0 : << std::endl;
28 0 : return false;
29 : }
30 :
31 4 : const std::string zmqAddress = std::string("tcp://") + address;
32 2 : if (zmq_connect(socket, zmqAddress.c_str()) == -1)
33 : {
34 0 : ZEROEQINFO << "Can't reach connection broker at " << address
35 0 : << std::endl;
36 0 : zmq_close(socket);
37 0 : return false;
38 : }
39 :
40 4 : const std::string& pubAddress = publisher.getAddress();
41 : zmq_msg_t request;
42 2 : zmq_msg_init_size(&request, pubAddress.size());
43 2 : memcpy(zmq_msg_data(&request), pubAddress.c_str(), pubAddress.size());
44 :
45 2 : if (zmq_msg_send(&request, socket, 0) == -1)
46 : {
47 0 : zmq_msg_close(&request);
48 : ZEROEQINFO << "Can't send connection request " << pubAddress << " to "
49 0 : << address << ": " << zmq_strerror(zmq_errno()) << std::endl;
50 0 : return false;
51 : }
52 2 : zmq_msg_close(&request);
53 :
54 : zmq_msg_t reply;
55 2 : zmq_msg_init(&reply);
56 2 : if (zmq_msg_recv(&reply, socket, 0) == -1)
57 : {
58 0 : zmq_msg_close(&reply);
59 0 : ZEROEQINFO << "Can't receive connection reply from " << address
60 0 : << std::endl;
61 0 : return false;
62 : }
63 :
64 2 : const std::string result((const char*)zmq_msg_data(&reply),
65 6 : zmq_msg_size(&reply));
66 2 : zmq_msg_close(&reply);
67 :
68 2 : zmq_close(socket);
69 :
70 2 : return pubAddress == std::string(result);
71 : }
72 :
73 1 : bool Service::subscribe(const std::string& hostname, const std::string& name,
74 : const Publisher& publisher)
75 : {
76 2 : const std::string address(hostname + ":" +
77 4 : std::to_string(uint32_t(detail::getPort(name))));
78 2 : return subscribe(address, publisher);
79 : }
80 : }
81 24 : }
|