Line data Source code
1 : /* Copyright (c) 2017, Human Brain Project
2 : * Stefan.Eilemann@epfl.ch
3 : */
4 :
5 : #pragma once
6 :
7 : #include "common.h"
8 : #include "constants.h"
9 : #include "context.h"
10 : #include "socket.h"
11 :
12 : #include "../log.h"
13 :
14 : #include <servus/listener.h>
15 : #include <servus/servus.h>
16 : #include <zmq.h>
17 :
18 : #include <algorithm>
19 :
20 : namespace zeroeq
21 : {
22 : namespace detail
23 : {
24 : /** Manages and updates a set of connections with a zeroconf browser. */
25 : class Receiver : public servus::Listener
26 : {
27 : public:
28 29 : Receiver(const std::string& service, const std::string session)
29 58 : : _servus(session == TEST_SESSION ? session : service)
30 : , _session(session)
31 62 : , _context(detail::getContext())
32 : {
33 29 : if (session == zeroeq::NULL_SESSION || session.empty())
34 4 : ZEROEQTHROW(std::runtime_error(
35 : std::string("Invalid session name for browsing")));
36 :
37 25 : if (!servus::Servus::isAvailable())
38 : {
39 : ZEROEQWARN << "ZeroEQ::Receiver: Cannot browse Zeroconf for "
40 : "incoming connections; no implementation provided by "
41 0 : "Servus"
42 0 : << std::endl;
43 0 : return;
44 : }
45 :
46 25 : _servus.addListener(this);
47 25 : _servus.beginBrowsing(servus::Servus::IF_ALL);
48 : }
49 :
50 30 : Receiver(const std::string& service)
51 30 : : _servus(service)
52 : , _session(zeroeq::NULL_SESSION)
53 30 : , _context(detail::getContext())
54 : {
55 30 : }
56 :
57 55 : virtual ~Receiver()
58 110 : {
59 55 : if (_servus.isBrowsing())
60 : {
61 25 : _servus.endBrowsing();
62 25 : _servus.addListener(this);
63 : }
64 55 : }
65 :
66 2 : const std::string& getSession() const { return _session; }
67 380695 : bool update() //!< @return true if new connection made
68 : {
69 380695 : if (!_servus.isBrowsing())
70 317246 : return false;
71 :
72 63449 : _updated = false;
73 63449 : _servus.browse(0);
74 63449 : return _updated;
75 : }
76 :
77 6 : void instanceAdded(const std::string& instance) final
78 : {
79 12 : const std::string& zmqURI = _getZmqURI(instance);
80 6 : if (_sockets.count(zmqURI) > 0) // Already got this instance
81 0 : return;
82 :
83 6 : const std::string& session = _servus.get(instance, KEY_SESSION);
84 12 : if (_servus.containsKey(instance, KEY_SESSION) && !_session.empty() &&
85 6 : session != _session)
86 : {
87 0 : return;
88 : }
89 :
90 6 : const uint128_t identifier(_servus.get(instance, KEY_INSTANCE));
91 12 : zmq::SocketPtr socket = createSocket(identifier);
92 6 : if (socket && _connect(zmqURI, socket))
93 4 : _updated = true;
94 : }
95 :
96 0 : void instanceRemoved(const std::string& instance) final
97 : {
98 0 : if (_disconnect(_getZmqURI(instance)))
99 0 : _updated = true;
100 0 : }
101 :
102 34 : bool addConnection(const std::string& zmqURI)
103 : {
104 68 : zmq::SocketPtr socket = createSocket(uint128_t());
105 34 : if (socket)
106 34 : return _connect(zmqURI, socket);
107 0 : return true;
108 : }
109 :
110 380670 : void addSockets(std::vector<detail::Socket>& entries)
111 : {
112 380670 : entries.insert(entries.end(), _entries.begin(), _entries.end());
113 380670 : }
114 :
115 : protected:
116 : using SocketMap = std::map<std::string, zmq::SocketPtr>;
117 :
118 36 : void* getContext() { return _context.get(); }
119 : /**
120 : * Create the socket for the given instance, return nullptr if connection is
121 : * to be ignored.
122 : */
123 : virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0;
124 :
125 33 : const SocketMap& getSockets() { return _sockets; }
126 38 : bool _connect(const std::string& zmqURI, zmq::SocketPtr socket)
127 : {
128 38 : if (zmq_connect(socket.get(), zmqURI.c_str()) == -1)
129 : {
130 : ZEROEQINFO << "Cannot connect to " << zmqURI << ": "
131 1 : << zmq_strerror(zmq_errno()) << std::endl;
132 1 : return false;
133 : }
134 :
135 37 : _sockets[zmqURI] = socket; // ref socket since zmq struct is void*
136 :
137 : detail::Socket entry;
138 37 : entry.socket = socket.get();
139 37 : entry.events = ZMQ_POLLIN;
140 37 : _entries.push_back(entry);
141 37 : return true;
142 : }
143 :
144 0 : bool _disconnect(const std::string& zmqURI)
145 : {
146 0 : auto i = _sockets.find(zmqURI);
147 0 : if (i == _sockets.end()) // Don't know this instance
148 0 : return false;
149 :
150 0 : auto socket = i->second;
151 0 : if (zmq_disconnect(socket.get(), zmqURI.c_str()) == -1)
152 : {
153 : ZEROEQINFO << "Cannot disconnect from " << zmqURI << ": "
154 0 : << zmq_strerror(zmq_errno()) << std::endl;
155 : }
156 :
157 : std::remove_if(_entries.begin(), _entries.end(),
158 0 : [socket](const detail::Socket& candidate) {
159 0 : return candidate.socket == socket.get();
160 0 : });
161 0 : _sockets.erase(i);
162 0 : return true;
163 : }
164 :
165 : private:
166 : servus::Servus _servus;
167 : const std::string _session;
168 :
169 : zmq::ContextPtr _context;
170 : SocketMap _sockets;
171 : std::vector<detail::Socket> _entries;
172 :
173 : bool _updated{false};
174 :
175 6 : std::string _getZmqURI(const std::string& instance)
176 : {
177 6 : const size_t pos = instance.find(":");
178 12 : const std::string& host = instance.substr(0, pos);
179 12 : const std::string& port = instance.substr(pos + 1);
180 :
181 12 : return buildZmqURI(DEFAULT_SCHEMA, host, std::stoi(port));
182 : }
183 : };
184 : }
185 : }
|