LCOV - code coverage report
Current view: top level - zeroeq - subscriber.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 114 121 94.2 %
Date: 2017-12-01 01:44:57 Functions: 38 39 97.4 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2014-2017, Human Brain Project
       3             :  *                          Daniel Nachbaur <daniel.nachbaur@epfl.ch>
       4             :  *                          Stefan.Eilemann@epfl.ch
       5             :  */
       6             : 
       7             : #include "subscriber.h"
       8             : 
       9             : #include "detail/byteswap.h"
      10             : #include "detail/common.h"
      11             : #include "detail/constants.h"
      12             : #include "detail/receiver.h"
      13             : #include "detail/sender.h"
      14             : #include "detail/socket.h"
      15             : #include "log.h"
      16             : 
      17             : #include <servus/serializable.h>
      18             : #include <servus/servus.h>
      19             : 
      20             : #include <cassert>
      21             : #include <cstring>
      22             : #include <map>
      23             : #include <stdexcept>
      24             : 
      25             : namespace zeroeq
      26             : {
      27          80 : class Subscriber::Impl : public detail::Receiver
      28             : {
      29             : public:
      30          28 :     Impl(const std::string& session)
      31          28 :         : detail::Receiver(PUBLISHER_SERVICE, session == DEFAULT_SESSION
      32          56 :                                                   ? getDefaultPubSession()
      33             :                                                   : session)
      34          52 :         , _selfInstance(detail::Sender::getUUID())
      35             :     {
      36          24 :         update();
      37          24 :     }
      38             : 
      39          19 :     Impl(const URIs& uris)
      40          19 :         : detail::Receiver(PUBLISHER_SERVICE)
      41          22 :         , _selfInstance(detail::Sender::getUUID())
      42             :     {
      43          36 :         for (const URI& uri : uris)
      44             :         {
      45          20 :             if (!uri.isFullyQualified())
      46           2 :                 ZEROEQTHROW(std::runtime_error(std::string(
      47             :                     "Non-fully qualified URI used for subscriber")));
      48             : 
      49          36 :             const std::string& zmqURI = buildZmqURI(uri);
      50          18 :             if (!addConnection(zmqURI))
      51             :             {
      52           1 :                 ZEROEQTHROW(std::runtime_error("Cannot connect subscriber to " +
      53             :                                                zmqURI + ": " +
      54             :                                                zmq_strerror(zmq_errno())));
      55             :             }
      56             :         }
      57          16 :     }
      58             : 
      59           7 :     bool subscribe(servus::Serializable& serializable)
      60             :     {
      61           1 :         const auto func = [&serializable](const void* data, const size_t size) {
      62           1 :             serializable.fromBinary(data, size);
      63           8 :         };
      64           7 :         return subscribe(serializable.getTypeIdentifier(), func);
      65             :     }
      66             : 
      67          30 :     bool subscribe(const uint128_t& event, const EventPayloadFunc& func)
      68             :     {
      69          30 :         if (_eventFuncs.count(event) != 0)
      70           2 :             return false;
      71             : 
      72          28 :         _subscribe(event);
      73          28 :         _eventFuncs[event] = func;
      74          28 :         return true;
      75             :     }
      76             : 
      77           5 :     bool unsubscribe(const servus::Serializable& serializable)
      78             :     {
      79           5 :         return unsubscribe(serializable.getTypeIdentifier());
      80             :     }
      81             : 
      82           8 :     bool unsubscribe(const uint128_t& event)
      83             :     {
      84           8 :         if (_eventFuncs.erase(event) == 0)
      85           3 :             return false;
      86             : 
      87           5 :         _unsubscribe(event);
      88           5 :         return true;
      89             :     }
      90             : 
      91       19993 :     bool process(detail::Socket& socket)
      92             :     {
      93             :         zmq_msg_t msg;
      94       19993 :         zmq_msg_init(&msg);
      95       19993 :         zmq_msg_recv(&msg, socket.socket, 0);
      96             : 
      97       19993 :         uint128_t type;
      98       19993 :         memcpy(&type, zmq_msg_data(&msg), sizeof(type));
      99             : #ifndef ZEROEQ_LITTLEENDIAN
     100             :         detail::byteswap(type); // convert from little endian wire
     101             : #endif
     102       19993 :         const bool payload = zmq_msg_more(&msg);
     103       19993 :         zmq_msg_close(&msg);
     104             : 
     105       19993 :         if (payload)
     106             :         {
     107       19989 :             zmq_msg_init(&msg);
     108       19989 :             zmq_msg_recv(&msg, socket.socket, 0);
     109             :         }
     110             : 
     111       19993 :         EventFuncMap::const_iterator i = _eventFuncs.find(type);
     112       19993 :         if (i == _eventFuncs.cend())
     113             :         {
     114           0 :             if (payload)
     115           0 :                 zmq_msg_close(&msg);
     116             : 
     117           0 :             ZEROEQTHROW(std::runtime_error("Got unsubscribed event " +
     118             :                                            type.getString()));
     119             :         }
     120             : 
     121       19993 :         if (payload)
     122             :         {
     123       19989 :             i->second(zmq_msg_data(&msg), zmq_msg_size(&msg));
     124       19989 :             zmq_msg_close(&msg);
     125             :         }
     126             :         else
     127           4 :             i->second(nullptr, 0);
     128       19993 :         return true;
     129             :     }
     130             : 
     131          26 :     zmq::SocketPtr createSocket(const uint128_t& instance)
     132             :     {
     133          26 :         if (instance == _selfInstance)
     134           2 :             return {};
     135             : 
     136             :         zmq::SocketPtr socket(zmq_socket(getContext(), ZMQ_SUB),
     137          72 :                               [](void* s) { ::zmq_close(s); });
     138          24 :         const int hwm = 0;
     139          24 :         zmq_setsockopt(socket.get(), ZMQ_RCVHWM, &hwm, sizeof(hwm));
     140             : 
     141             :         // Tell a Monitor on a Publisher we're here
     142          24 :         if (zmq_setsockopt(socket.get(), ZMQ_SUBSCRIBE, &MEERKAT,
     143             :                            sizeof(uint128_t)) == -1)
     144             :         {
     145           0 :             ZEROEQTHROW(std::runtime_error(
     146             :                 std::string("Cannot update meerkat filter: ") +
     147             :                 zmq_strerror(zmq_errno())));
     148             :         }
     149             : 
     150             :         // Add existing subscriptions to socket
     151          28 :         for (const auto& i : _eventFuncs)
     152             :         {
     153           4 :             if (zmq_setsockopt(socket.get(), ZMQ_SUBSCRIBE, &i.first,
     154             :                                sizeof(uint128_t)) == -1)
     155             :             {
     156           0 :                 ZEROEQTHROW(std::runtime_error(
     157             :                     std::string("Cannot update topic filter: ") +
     158             :                     zmq_strerror(zmq_errno())));
     159             :             }
     160             :         }
     161          24 :         return socket;
     162             :     }
     163             : 
     164             : private:
     165             :     typedef std::map<uint128_t, EventPayloadFunc> EventFuncMap;
     166             :     EventFuncMap _eventFuncs;
     167             : 
     168             :     const uint128_t _selfInstance;
     169             : 
     170          28 :     void _subscribe(const uint128_t& event)
     171             :     {
     172          44 :         for (const auto& socket : getSockets())
     173             :         {
     174          16 :             if (zmq_setsockopt(socket.second.get(), ZMQ_SUBSCRIBE, &event,
     175             :                                sizeof(event)) == -1)
     176             :             {
     177           0 :                 ZEROEQTHROW(std::runtime_error(
     178             :                     std::string("Cannot update topic filter: ") +
     179             :                     zmq_strerror(zmq_errno())));
     180             :             }
     181             :         }
     182          28 :     }
     183             : 
     184           5 :     void _unsubscribe(const uint128_t& event)
     185             :     {
     186           6 :         for (const auto& socket : getSockets())
     187             :         {
     188           1 :             if (zmq_setsockopt(socket.second.get(), ZMQ_UNSUBSCRIBE, &event,
     189             :                                sizeof(event)) == -1)
     190             :             {
     191           0 :                 ZEROEQTHROW(std::runtime_error(
     192             :                     std::string("Cannot update topic filter: ") +
     193             :                     zmq_strerror(zmq_errno())));
     194             :             }
     195             :         }
     196           5 :     }
     197             : };
     198             : 
     199          12 : Subscriber::Subscriber()
     200             :     : Receiver()
     201          12 :     , _impl(new Impl(DEFAULT_SESSION))
     202             : {
     203          12 : }
     204             : 
     205          11 : Subscriber::Subscriber(const std::string& session)
     206             :     : Receiver()
     207          13 :     , _impl(new Impl(session))
     208             : {
     209           9 : }
     210             : 
     211          15 : Subscriber::Subscriber(const URIs& uris)
     212             :     : Receiver()
     213          17 :     , _impl(new Impl(uris))
     214             : {
     215          13 : }
     216             : 
     217           2 : Subscriber::Subscriber(Receiver& shared)
     218             :     : Receiver(shared)
     219           2 :     , _impl(new Impl(DEFAULT_SESSION))
     220             : {
     221           2 : }
     222             : 
     223           3 : Subscriber::Subscriber(const std::string& session, Receiver& shared)
     224             :     : Receiver(shared)
     225           5 :     , _impl(new Impl(session))
     226             : {
     227           1 : }
     228             : 
     229           4 : Subscriber::Subscriber(const URIs& uris, Receiver& shared)
     230             :     : Receiver(shared)
     231           5 :     , _impl(new Impl(uris))
     232             : {
     233           3 : }
     234             : 
     235          41 : Subscriber::~Subscriber()
     236             : {
     237          41 : }
     238             : 
     239           7 : bool Subscriber::subscribe(servus::Serializable& serializable)
     240             : {
     241           7 :     return _impl->subscribe(serializable);
     242             : }
     243             : 
     244           9 : bool Subscriber::subscribe(const uint128_t& event, const EventFunc& func)
     245             : {
     246          66 :     return _impl->subscribe(event, [func](const void*, size_t) { func(); });
     247             : }
     248             : 
     249          14 : bool Subscriber::subscribe(const uint128_t& event, const EventPayloadFunc& func)
     250             : {
     251          14 :     return _impl->subscribe(event, func);
     252             : }
     253             : 
     254           5 : bool Subscriber::unsubscribe(const servus::Serializable& serializable)
     255             : {
     256           5 :     return _impl->unsubscribe(serializable);
     257             : }
     258             : 
     259           3 : bool Subscriber::unsubscribe(const uint128_t& event)
     260             : {
     261           3 :     return _impl->unsubscribe(event);
     262             : }
     263             : 
     264           2 : const std::string& Subscriber::getSession() const
     265             : {
     266           2 :     return _impl->getSession();
     267             : }
     268             : 
     269      325248 : void Subscriber::addSockets(std::vector<detail::Socket>& entries)
     270             : {
     271      325248 :     _impl->addSockets(entries);
     272      325248 : }
     273             : 
     274       19993 : bool Subscriber::process(detail::Socket& socket)
     275             : {
     276       19993 :     return _impl->process(socket);
     277             : }
     278             : 
     279      325248 : void Subscriber::update()
     280             : {
     281      325248 :     _impl->update();
     282      325248 : }
     283             : 
     284           2 : void Subscriber::addConnection(const std::string& uri)
     285             : {
     286           2 :     _impl->addConnection(uri);
     287           2 : }
     288          24 : }

Generated by: LCOV version 1.11