LCOV - code coverage report
Current view: top level - zeroeq - publisher.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 63 67 94.0 %
Date: 2017-12-01 01:44:57 Functions: 18 19 94.7 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2014-2017, Human Brain Project
       3             :  *                          Daniel Nachbaur <daniel.nachbaur@epfl.ch>
       4             :  *                          Stefan.Eilemann@epfl.ch
       5             :  */
       6             : 
       7             : #include "publisher.h"
       8             : 
       9             : #include "detail/byteswap.h"
      10             : #include "detail/common.h"
      11             : #include "detail/constants.h"
      12             : #include "detail/sender.h"
      13             : #include "log.h"
      14             : 
      15             : #include <servus/serializable.h>
      16             : 
      17             : #include <zmq.h>
      18             : 
      19             : #include <cstring>
      20             : #include <map>
      21             : 
      22             : namespace zeroeq
      23             : {
      24             : class Publisher::Impl : public detail::Sender
      25             : {
      26             : public:
      27          26 :     Impl(const URI& uri_, const std::string& session)
      28          26 :         : detail::Sender(uri_, ZMQ_XPUB, PUBLISHER_SERVICE,
      29          52 :                          session == DEFAULT_SESSION ? getDefaultPubSession()
      30          28 :                                                     : session)
      31             :     {
      32          26 :         if (session.empty())
      33           1 :             ZEROEQTHROW(std::runtime_error(
      34             :                 "Empty session is not allowed for publisher"));
      35             : 
      36          50 :         const std::string& zmqURI = buildZmqURI(uri);
      37          25 :         if (zmq_bind(socket.get(), zmqURI.c_str()) == -1)
      38           1 :             ZEROEQTHROW(std::runtime_error(
      39             :                 std::string("Cannot bind publisher socket '") + zmqURI +
      40             :                 "': " + zmq_strerror(zmq_errno())));
      41             : 
      42          24 :         initURI();
      43          24 :         if (session != NULL_SESSION)
      44          10 :             announce();
      45          24 :     }
      46             : 
      47          24 :     ~Impl() {}
      48       40023 :     bool publish(const servus::Serializable& serializable)
      49             :     {
      50       80046 :         const servus::Serializable::Data& data = serializable.toBinary();
      51       40023 :         return publish(serializable.getTypeIdentifier(), data.ptr.get(),
      52      120069 :                        data.size);
      53             :     }
      54             : 
      55       40032 :     bool publish(uint128_t event, const void* data, const size_t size)
      56             :     {
      57             : #ifdef ZEROEQ_BIGENDIAN
      58             :         detail::byteswap(event); // convert to little endian wire protocol
      59             : #endif
      60       40032 :         const bool hasPayload = data && size > 0;
      61             : 
      62             :         zmq_msg_t msgHeader;
      63       40032 :         zmq_msg_init_size(&msgHeader, sizeof(event));
      64       40032 :         memcpy(zmq_msg_data(&msgHeader), &event, sizeof(event));
      65       40032 :         int ret = zmq_msg_send(&msgHeader, socket.get(),
      66       40032 :                                hasPayload ? ZMQ_SNDMORE : 0);
      67       40032 :         zmq_msg_close(&msgHeader);
      68       40032 :         if (ret == -1)
      69             :         {
      70             :             ZEROEQWARN << "Cannot publish message header, got "
      71           0 :                        << zmq_strerror(zmq_errno()) << std::endl;
      72           0 :             return false;
      73             :         }
      74             : 
      75       40032 :         if (!hasPayload)
      76       20009 :             return true;
      77             : 
      78             :         zmq_msg_t msg;
      79       20023 :         zmq_msg_init_size(&msg, size);
      80       20023 :         ::memcpy(zmq_msg_data(&msg), data, size);
      81       20023 :         ret = zmq_msg_send(&msg, socket.get(), 0);
      82       20023 :         zmq_msg_close(&msg);
      83       20023 :         if (ret == -1)
      84             :         {
      85             :             ZEROEQWARN << "Cannot publish message data, got "
      86           0 :                        << zmq_strerror(zmq_errno()) << std::endl;
      87           0 :             return false;
      88             :         }
      89       20023 :         return true;
      90             :     }
      91             : };
      92             : 
      93           1 : Publisher::Publisher()
      94           1 :     : _impl(new Impl(URI(), DEFAULT_SESSION))
      95             : {
      96           1 : }
      97             : 
      98          20 : Publisher::Publisher(const std::string& session)
      99          21 :     : _impl(new Impl(URI(), session))
     100             : {
     101          19 : }
     102             : 
     103           2 : Publisher::Publisher(const URI& uri)
     104           3 :     : _impl(new Impl(uri, DEFAULT_SESSION))
     105             : {
     106           1 : }
     107             : 
     108           3 : Publisher::Publisher(const URI& uri, const std::string& session)
     109           3 :     : _impl(new Impl(uri, session))
     110             : {
     111           3 : }
     112             : 
     113          24 : Publisher::~Publisher()
     114             : {
     115          24 : }
     116             : 
     117       40023 : bool Publisher::publish(const servus::Serializable& serializable)
     118             : {
     119       40023 :     return _impl->publish(serializable);
     120             : }
     121             : 
     122           6 : bool Publisher::publish(const uint128_t& event)
     123             : {
     124           6 :     return _impl->publish(event, nullptr, 0);
     125             : }
     126             : 
     127           3 : bool Publisher::publish(const uint128_t& event, const void* data,
     128             :                         const size_t size)
     129             : {
     130           3 :     return _impl->publish(event, data, size);
     131             : }
     132             : 
     133           2 : std::string Publisher::getAddress() const
     134             : {
     135           2 :     return _impl->getAddress();
     136             : }
     137             : 
     138           7 : const std::string& Publisher::getSession() const
     139             : {
     140           7 :     return _impl->getSession();
     141             : }
     142             : 
     143          15 : const URI& Publisher::getURI() const
     144             : {
     145          15 :     return _impl->uri;
     146             : }
     147             : 
     148           2 : zmq::SocketPtr Publisher::getSocket()
     149             : {
     150           2 :     return _impl->socket;
     151             : }
     152          24 : }

Generated by: LCOV version 1.11