LCOV - code coverage report
Current view: top level - zeroeq - receiver.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 77 78 98.7 %
Date: 2017-12-01 01:44:57 Functions: 13 14 92.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2014-2017, Human Brain Project
       3             :  *                          Daniel Nachbaur <daniel.nachbaur@epfl.ch>
       4             :  *                          Stefan.Eilemann@epfl.ch
       5             :  */
       6             : 
       7             : #define NOMINMAX // otherwise std::min/max below don't work on VS
       8             : 
       9             : #include "receiver.h"
      10             : #include "detail/socket.h"
      11             : #include "log.h"
      12             : 
      13             : #include <algorithm>
      14             : #include <chrono>
      15             : #include <deque>
      16             : #include <stdexcept>
      17             : 
      18             : namespace zeroeq
      19             : {
      20             : using std::chrono::duration_cast;
      21             : using std::chrono::high_resolution_clock;
      22             : using std::chrono::milliseconds;
      23             : using std::chrono::nanoseconds;
      24             : 
      25         192 : class Receiver::Impl
      26             : {
      27             : public:
      28         115 :     void add(::zeroeq::Receiver* receiver) { _shared.push_back(receiver); }
      29         115 :     void remove(::zeroeq::Receiver* receiver)
      30             :     {
      31         230 :         _shared.erase(std::remove(_shared.begin(), _shared.end(), receiver),
      32         345 :                       _shared.end());
      33         115 :     }
      34             : 
      35       60200 :     bool receive(const uint32_t timeout)
      36             :     {
      37       60200 :         if (timeout == TIMEOUT_INDEFINITE)
      38           1 :             return _blockingReceive();
      39             : 
      40             :         // Never fully block. Give receivers a chance to update, e.g., to check
      41             :         // for new connections from zeroconf (#20)
      42       60199 :         const uint32_t block = std::min(1000u, timeout / 10);
      43             : 
      44       60199 :         const auto startTime = high_resolution_clock::now();
      45             :         while (true)
      46             :         {
      47     2449913 :             for (::zeroeq::Receiver* receiver : _shared)
      48     1286326 :                 receiver->update();
      49             : 
      50     1163584 :             const auto endTime = high_resolution_clock::now();
      51             :             const uint32_t elapsed =
      52     1163585 :                 nanoseconds(endTime - startTime).count() / 1000000;
      53     1163583 :             uint32_t wait = 0;
      54     1163583 :             if (elapsed < timeout)
      55     1103534 :                 wait = std::min(timeout - uint32_t(elapsed), block);
      56             : 
      57     1163583 :             if (_receive(wait))
      58       80322 :                 return true;
      59             : 
      60     1143465 :             if (elapsed >= timeout)
      61       40076 :                 return false;
      62     1103389 :         }
      63             :     }
      64             : 
      65             : private:
      66             :     typedef std::vector<::zeroeq::Receiver*> Receivers;
      67             :     typedef Receivers::iterator ReceiversIter;
      68             : 
      69             :     Receivers _shared;
      70             : 
      71        3238 :     bool _blockingReceive()
      72             :     {
      73             :         while (true)
      74             :         {
      75        6476 :             for (::zeroeq::Receiver* receiver : _shared)
      76        3238 :                 receiver->update();
      77             : 
      78             :             // Never fully block. Give receivers a chance to update, e.g., to
      79             :             // check for new connections from zeroconf (#20)
      80        3238 :             if (_receive(1000))
      81           2 :                 return true;
      82        3237 :         }
      83             :     }
      84             : 
      85     1166821 :     bool _receive(uint32_t timeout)
      86             :     {
      87             :         // ZMQ notifications on its sockets is edge-triggered, hence we have
      88             :         // to receive all pending POLLIN events to not 'loose' notifications
      89             :         // from the socket descriptors (c.f. HTTP server).
      90             :         // For reference:
      91             :         // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification
      92     1166821 :         const auto startTime = high_resolution_clock::now();
      93     1166823 :         bool haveData = false;
      94     1166823 :         bool hadData = false;
      95       40248 :         do
      96             :         {
      97     1186947 :             std::vector<detail::Socket> sockets;
      98     1186946 :             std::deque<size_t> intervals;
      99     2456382 :             for (::zeroeq::Receiver* receiver : _shared)
     100             :             {
     101     1289559 :                 const size_t before = sockets.size();
     102     1289558 :                 receiver->addSockets(sockets);
     103     1289564 :                 intervals.push_back(sockets.size() - before);
     104             :             }
     105             : 
     106     2333642 :             const auto remaining = duration_cast<milliseconds>(
     107     2333640 :                                        high_resolution_clock::now() - startTime)
     108     1166822 :                                        .count();
     109             : 
     110     1166820 :             switch (zmq_poll(sockets.data(), int(sockets.size()), remaining))
     111             :             {
     112             :             case -1: // error
     113           0 :                 ZEROEQTHROW(std::runtime_error(std::string("Poll error: ") +
     114             :                                                zmq_strerror(zmq_errno())));
     115             : 
     116             :             case 0: // timeout; no events signaled during poll
     117     1146701 :                 return hadData;
     118             : 
     119             :             default:
     120             :             {
     121             :                 // For each event, find the subscriber which supplied the socket
     122             :                 // and inform it in case there is data on the socket. We saved
     123             :                 // #sockets for each subscriber above and track them down here
     124             :                 // as we iterate over all sockets:
     125       20124 :                 ReceiversIter i = _shared.begin();
     126       20124 :                 size_t interval = intervals.front();
     127       20124 :                 intervals.pop_front();
     128             : 
     129             :                 // prepare for potential next poll; from now on continue
     130             :                 // non-blocking to fullfil edge-triggered contract
     131       20124 :                 haveData = false;
     132       20124 :                 timeout = 0;
     133             : 
     134       40268 :                 for (auto& socket : sockets)
     135             :                 {
     136       20180 :                     while (interval == 0 || interval-- == 0)
     137             :                     {
     138          18 :                         ++i;
     139          18 :                         interval = intervals.front();
     140          18 :                         intervals.pop_front();
     141             :                     }
     142             : 
     143       20144 :                     if (socket.revents & ZMQ_POLLIN)
     144             :                     {
     145       20129 :                         if ((*i)->process(socket))
     146             :                         {
     147       20129 :                             haveData = true;
     148       20129 :                             hadData = true;
     149             :                         }
     150             :                     }
     151             :                 }
     152             :             }
     153             :             }
     154       80496 :         } while (haveData && duration_cast<milliseconds>(
     155       60372 :                                  high_resolution_clock::now() - startTime)
     156       20124 :                                      .count() < timeout);
     157       20124 :         return hadData;
     158             :     }
     159             : };
     160             : 
     161          96 : Receiver::Receiver()
     162          96 :     : _impl(new Receiver::Impl)
     163             : {
     164          96 :     _impl->add(this);
     165          96 : }
     166             : 
     167          19 : Receiver::Receiver(Receiver& shared)
     168          19 :     : _impl(shared._impl)
     169             : {
     170          19 :     _impl->add(this);
     171          19 : }
     172             : 
     173         230 : Receiver::~Receiver()
     174             : {
     175         115 :     _impl->remove(this);
     176         115 : }
     177             : 
     178             : Receiver::Receiver(Receiver&&) = default;
     179             : Receiver& Receiver::operator=(Receiver&&) = default;
     180             : 
     181       60200 : bool Receiver::receive(const uint32_t timeout)
     182             : {
     183       60200 :     return _impl->receive(timeout);
     184             : }
     185             : 
     186             : // LCOV_EXCL_START
     187             : void Receiver::addConnection(const std::string&)
     188             : {
     189             :     ZEROEQDONTCALL;
     190             : }
     191             : // LCOV_EXCL_STOP
     192          24 : }

Generated by: LCOV version 1.11