Line data Source code
1 :
2 : /* Copyright (c) 2015-2017, Human Brain Project
3 : * Stefan.Eilemann@epfl.ch
4 : */
5 :
6 : #include "sender.h"
7 :
8 : #include "../log.h"
9 : #include "common.h"
10 : #include "constants.h"
11 : #include "context.h"
12 : #include "socket.h"
13 :
14 : #include <zmq.h>
15 :
16 : // for NI_MAXHOST
17 : #ifdef _WIN32
18 : #include <Ws2tcpip.h>
19 : #else
20 : #include <netdb.h>
21 : #include <unistd.h>
22 : #endif
23 :
24 : namespace zeroeq
25 : {
26 : namespace detail
27 : {
28 4 : Sender::Sender(const URI& uri_, const int type)
29 4 : : Sender(uri_, type, {}, {})
30 : {
31 4 : }
32 :
33 80 : Sender::Sender(const URI& uri_, const int type, const std::string service,
34 80 : const std::string& session)
35 : : _context(getContext())
36 : , uri(uri_)
37 80 : , socket(zmq_socket(_context.get(), type), [](void* s) { ::zmq_close(s); })
38 80 : , _service(session == TEST_SESSION ? session : service)
39 160 : , _session(session)
40 : {
41 80 : const int hwm = 0;
42 80 : zmq_setsockopt(socket.get(), ZMQ_SNDHWM, &hwm, sizeof(hwm));
43 80 : }
44 :
45 160 : Sender::~Sender()
46 : {
47 80 : socket.reset();
48 80 : }
49 :
50 51 : std::string Sender::getAddress() const
51 : {
52 51 : return uri.getHost() + ":" + std::to_string(uint32_t(uri.getPort()));
53 : }
54 :
55 39 : void Sender::initURI()
56 : {
57 39 : if (uri.getScheme() != DEFAULT_SCHEMA)
58 6 : return;
59 :
60 72 : std::string host = uri.getHost();
61 36 : if (host == "*")
62 3 : host.clear();
63 :
64 36 : uint16_t port = uri.getPort();
65 36 : if (!host.empty() && port != 0)
66 0 : return;
67 :
68 72 : std::string hostStr, portStr;
69 36 : _getEndPoint(hostStr, portStr);
70 :
71 36 : if (port == 0)
72 : {
73 : // No overflow is possible unless ZMQ reports bad port number.
74 34 : port = std::stoi(portStr);
75 34 : uri.setPort(port);
76 : }
77 :
78 36 : if (host.empty())
79 34 : uri.setHost(hostStr);
80 :
81 36 : ZEROEQINFO << "Bound to " << uri << std::endl;
82 : }
83 :
84 46 : void Sender::announce()
85 : {
86 46 : if (!servus::Servus::isAvailable())
87 : {
88 : ZEROEQWARN << "ZeroEQ::Sender: Cannot announce on Zeroconf; no "
89 0 : "implementation provided by Servus"
90 0 : << std::endl;
91 0 : return;
92 : }
93 :
94 46 : _service.set("Type", "ZeroEQ");
95 46 : _service.set(KEY_INSTANCE, getUUID().getString());
96 46 : _service.set(KEY_USER, getUserName());
97 46 : _service.set(KEY_APPLICATION, getApplicationName());
98 46 : if (!_session.empty())
99 46 : _service.set(KEY_SESSION, _session);
100 :
101 92 : const auto& result = _service.announce(uri.getPort(), getAddress());
102 46 : if (result == servus::Servus::Result::NOT_SUPPORTED)
103 : {
104 : ZEROEQWARN << "ZeroEQ::Sender: Cannot announce on Zeroconf; no "
105 0 : "implementation provided by Servus"
106 0 : << std::endl;
107 0 : return;
108 : }
109 46 : if (!result)
110 0 : ZEROEQTHROW(std::runtime_error("Zeroconf announce failed: " +
111 : result.getString()));
112 : }
113 :
114 417 : void Sender::addSockets(std::vector<zeroeq::detail::Socket>& entries)
115 : {
116 : zeroeq::detail::Socket entry;
117 417 : entry.socket = socket.get();
118 417 : entry.events = ZMQ_POLLIN;
119 417 : entries.push_back(entry);
120 419 : }
121 :
122 36 : void Sender::_getEndPoint(std::string& host, std::string& port) const
123 : {
124 : char buffer[1024];
125 36 : size_t size = sizeof(buffer);
126 36 : if (zmq_getsockopt(socket.get(), ZMQ_LAST_ENDPOINT, &buffer, &size) == -1)
127 : {
128 0 : ZEROEQTHROW(std::runtime_error("Cannot determine port of publisher"));
129 : }
130 72 : const std::string endPoint(buffer);
131 :
132 36 : port = endPoint.substr(endPoint.find_last_of(":") + 1);
133 36 : const size_t start = endPoint.find_last_of("/") + 1;
134 36 : const size_t end = endPoint.find_last_of(":");
135 36 : host = endPoint.substr(start, end - start);
136 36 : if (host == "0.0.0.0")
137 : {
138 34 : char hostname[NI_MAXHOST + 1] = {0};
139 34 : gethostname(hostname, NI_MAXHOST);
140 34 : hostname[NI_MAXHOST] = '\0';
141 34 : host = hostname;
142 : }
143 36 : }
144 :
145 93 : uint128_t& Sender::getUUID()
146 : {
147 93 : static uint128_t identifier = servus::make_UUID();
148 93 : return identifier;
149 : }
150 : }
151 24 : }
|