Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Daniel Nachbaur <daniel.nachbaur@epfl.ch>
4 : * Stefan.Eilemann@epfl.ch
5 : */
6 :
7 : #define NOMINMAX // otherwise std::min/max below don't work on VS
8 :
9 : #include "receiver.h"
10 : #include "detail/socket.h"
11 : #include "log.h"
12 :
13 : #include <algorithm>
14 : #include <chrono>
15 : #include <deque>
16 : #include <stdexcept>
17 :
18 : namespace zeroeq
19 : {
20 : using std::chrono::duration_cast;
21 : using std::chrono::high_resolution_clock;
22 : using std::chrono::milliseconds;
23 : using std::chrono::nanoseconds;
24 :
25 192 : class Receiver::Impl
26 : {
27 : public:
28 115 : void add(::zeroeq::Receiver* receiver) { _shared.push_back(receiver); }
29 115 : void remove(::zeroeq::Receiver* receiver)
30 : {
31 230 : _shared.erase(std::remove(_shared.begin(), _shared.end(), receiver),
32 345 : _shared.end());
33 115 : }
34 :
35 60200 : bool receive(const uint32_t timeout)
36 : {
37 60200 : if (timeout == TIMEOUT_INDEFINITE)
38 1 : return _blockingReceive();
39 :
40 : // Never fully block. Give receivers a chance to update, e.g., to check
41 : // for new connections from zeroconf (#20)
42 60199 : const uint32_t block = std::min(1000u, timeout / 10);
43 :
44 60199 : const auto startTime = high_resolution_clock::now();
45 : while (true)
46 : {
47 2449913 : for (::zeroeq::Receiver* receiver : _shared)
48 1286326 : receiver->update();
49 :
50 1163584 : const auto endTime = high_resolution_clock::now();
51 : const uint32_t elapsed =
52 1163585 : nanoseconds(endTime - startTime).count() / 1000000;
53 1163583 : uint32_t wait = 0;
54 1163583 : if (elapsed < timeout)
55 1103534 : wait = std::min(timeout - uint32_t(elapsed), block);
56 :
57 1163583 : if (_receive(wait))
58 80322 : return true;
59 :
60 1143465 : if (elapsed >= timeout)
61 40076 : return false;
62 1103389 : }
63 : }
64 :
65 : private:
66 : typedef std::vector<::zeroeq::Receiver*> Receivers;
67 : typedef Receivers::iterator ReceiversIter;
68 :
69 : Receivers _shared;
70 :
71 3238 : bool _blockingReceive()
72 : {
73 : while (true)
74 : {
75 6476 : for (::zeroeq::Receiver* receiver : _shared)
76 3238 : receiver->update();
77 :
78 : // Never fully block. Give receivers a chance to update, e.g., to
79 : // check for new connections from zeroconf (#20)
80 3238 : if (_receive(1000))
81 2 : return true;
82 3237 : }
83 : }
84 :
85 1166821 : bool _receive(uint32_t timeout)
86 : {
87 : // ZMQ notifications on its sockets is edge-triggered, hence we have
88 : // to receive all pending POLLIN events to not 'loose' notifications
89 : // from the socket descriptors (c.f. HTTP server).
90 : // For reference:
91 : // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification
92 1166821 : const auto startTime = high_resolution_clock::now();
93 1166823 : bool haveData = false;
94 1166823 : bool hadData = false;
95 40248 : do
96 : {
97 1186947 : std::vector<detail::Socket> sockets;
98 1186946 : std::deque<size_t> intervals;
99 2456382 : for (::zeroeq::Receiver* receiver : _shared)
100 : {
101 1289559 : const size_t before = sockets.size();
102 1289558 : receiver->addSockets(sockets);
103 1289564 : intervals.push_back(sockets.size() - before);
104 : }
105 :
106 2333642 : const auto remaining = duration_cast<milliseconds>(
107 2333640 : high_resolution_clock::now() - startTime)
108 1166822 : .count();
109 :
110 1166820 : switch (zmq_poll(sockets.data(), int(sockets.size()), remaining))
111 : {
112 : case -1: // error
113 0 : ZEROEQTHROW(std::runtime_error(std::string("Poll error: ") +
114 : zmq_strerror(zmq_errno())));
115 :
116 : case 0: // timeout; no events signaled during poll
117 1146701 : return hadData;
118 :
119 : default:
120 : {
121 : // For each event, find the subscriber which supplied the socket
122 : // and inform it in case there is data on the socket. We saved
123 : // #sockets for each subscriber above and track them down here
124 : // as we iterate over all sockets:
125 20124 : ReceiversIter i = _shared.begin();
126 20124 : size_t interval = intervals.front();
127 20124 : intervals.pop_front();
128 :
129 : // prepare for potential next poll; from now on continue
130 : // non-blocking to fullfil edge-triggered contract
131 20124 : haveData = false;
132 20124 : timeout = 0;
133 :
134 40268 : for (auto& socket : sockets)
135 : {
136 20180 : while (interval == 0 || interval-- == 0)
137 : {
138 18 : ++i;
139 18 : interval = intervals.front();
140 18 : intervals.pop_front();
141 : }
142 :
143 20144 : if (socket.revents & ZMQ_POLLIN)
144 : {
145 20129 : if ((*i)->process(socket))
146 : {
147 20129 : haveData = true;
148 20129 : hadData = true;
149 : }
150 : }
151 : }
152 : }
153 : }
154 80496 : } while (haveData && duration_cast<milliseconds>(
155 60372 : high_resolution_clock::now() - startTime)
156 20124 : .count() < timeout);
157 20124 : return hadData;
158 : }
159 : };
160 :
161 96 : Receiver::Receiver()
162 96 : : _impl(new Receiver::Impl)
163 : {
164 96 : _impl->add(this);
165 96 : }
166 :
167 19 : Receiver::Receiver(Receiver& shared)
168 19 : : _impl(shared._impl)
169 : {
170 19 : _impl->add(this);
171 19 : }
172 :
173 230 : Receiver::~Receiver()
174 : {
175 115 : _impl->remove(this);
176 115 : }
177 :
178 : Receiver::Receiver(Receiver&&) = default;
179 : Receiver& Receiver::operator=(Receiver&&) = default;
180 :
181 60200 : bool Receiver::receive(const uint32_t timeout)
182 : {
183 60200 : return _impl->receive(timeout);
184 : }
185 :
186 : // LCOV_EXCL_START
187 : void Receiver::addConnection(const std::string&)
188 : {
189 : ZEROEQDONTCALL;
190 : }
191 : // LCOV_EXCL_STOP
192 24 : }
|