LCOV - code coverage report
Current view: top level - zeroeq - monitor.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 44 86 51.2 %
Date: 2017-12-01 01:44:57 Functions: 15 22 68.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2017, Human Brain Project
       3             :  *                          Stefan.Eilemann@epfl.ch
       4             :  */
       5             : 
       6             : #include "monitor.h"
       7             : 
       8             : #include "detail/constants.h"
       9             : #include "detail/context.h"
      10             : #include "detail/socket.h"
      11             : #include "log.h"
      12             : #include "publisher.h"
      13             : 
      14             : #include <zmq.h>
      15             : 
      16             : namespace zeroeq
      17             : {
      18             : class Monitor::Impl
      19             : {
      20             : public:
      21           2 :     Impl() {}
      22           2 :     virtual ~Impl() {}
      23           2 :     void addSockets(std::vector<zeroeq::detail::Socket>& entries)
      24             :     {
      25             :         zeroeq::detail::Socket entry;
      26           2 :         entry.socket = _socket.get();
      27           2 :         entry.events = ZMQ_POLLIN;
      28           2 :         entries.push_back(entry);
      29           2 :     }
      30             : 
      31             :     virtual bool process(void* socket, Monitor& monitor) = 0;
      32             : 
      33             : protected:
      34             :     zmq::SocketPtr _socket;
      35             : };
      36             : 
      37             : namespace
      38             : {
      39             : class XPubImpl : public Monitor::Impl
      40             : {
      41             : public:
      42           2 :     XPubImpl(Sender& sender)
      43           2 :     {
      44           2 :         _socket = sender.getSocket();
      45             : 
      46           2 :         const int on = 1;
      47           2 :         if (zmq_setsockopt(_socket.get(), ZMQ_XPUB_VERBOSE, &on, sizeof(on)) ==
      48             :             -1)
      49             :         {
      50           0 :             ZEROEQTHROW(std::runtime_error(
      51             :                 std::string("Enabling ZMQ_XPUB_VERBOSE failed: ") +
      52             :                 zmq_strerror(zmq_errno())));
      53             :         }
      54           2 :     }
      55             : 
      56           4 :     ~XPubImpl()
      57           4 :     {
      58           2 :         const int off = 0;
      59           2 :         zmq_setsockopt(_socket.get(), ZMQ_XPUB_VERBOSE, &off, sizeof(off));
      60           4 :     }
      61             : 
      62           2 :     bool process(void* socket, Monitor& monitor)
      63             :     {
      64             :         // Message event is one byte 0=unsub or 1=sub, followed by topic
      65             :         zmq_msg_t msg;
      66           2 :         zmq_msg_init(&msg);
      67           2 :         if (zmq_msg_recv(&msg, socket, 0) == -1)
      68           0 :             return false;
      69             : 
      70           2 :         const uint8_t* data = (const uint8_t*)zmq_msg_data(&msg);
      71           2 :         switch (*data)
      72             :         {
      73             :         case 0:
      74           0 :             break; // unsub
      75             : 
      76             :         case 1: // sub
      77           4 :             if (zmq_msg_size(&msg) == sizeof(uint8_t) + sizeof(uint128_t) &&
      78           2 :                 *(const uint128_t*)(data + 1) == MEERKAT) // new subscriber
      79             :             {
      80           2 :                 monitor.notifyNewConnection();
      81           2 :                 return true;
      82             :             }
      83           0 :             break;
      84             : 
      85             :         default:
      86           0 :             ZEROEQWARN << "Unhandled monitor event" << std::endl;
      87             :         }
      88           0 :         zmq_msg_close(&msg);
      89           0 :         return false;
      90             :     }
      91             : };
      92             : 
      93             : class SocketImpl : public Monitor::Impl
      94             : {
      95             : public:
      96           0 :     SocketImpl(Sender& sender)
      97           0 :         : _context(detail::getContext())
      98             :     {
      99             : #if (ZMQ_VERSION < 40104)
     100             :         ZEROEQTHROW(std::runtime_error(
     101             :             "ZeroEQ version with bug in socket monitor, need at least 4.1.4"));
     102             : #endif
     103           0 :         const auto inproc = std::string("inproc://zeroeq.monitor.") +
     104           0 :                             servus::make_UUID().getString();
     105             : 
     106           0 :         if (::zmq_socket_monitor(sender.getSocket().get(), inproc.c_str(),
     107             :                                  ZMQ_EVENT_ALL) != 0)
     108             :         {
     109           0 :             ZEROEQTHROW(
     110             :                 std::runtime_error(std::string("Cannot monitor socket: ") +
     111             :                                    zmq_strerror(zmq_errno())));
     112             :         }
     113             : 
     114           0 :         _socket.reset(::zmq_socket(_context.get(), ZMQ_PAIR),
     115           0 :                       [](void* s) { ::zmq_close(s); });
     116           0 :         if (!_socket)
     117           0 :             ZEROEQTHROW(std::runtime_error(
     118             :                 std::string("Cannot create inproc socket: ") +
     119             :                 zmq_strerror(zmq_errno())));
     120             : 
     121           0 :         if (::zmq_connect(_socket.get(), inproc.c_str()) != 0)
     122             :         {
     123           0 :             ZEROEQTHROW(std::runtime_error(
     124             :                 std::string("Cannot connect inproc socket: ") +
     125             :                 zmq_strerror(zmq_errno())));
     126             :         }
     127           0 :     }
     128             : 
     129           0 :     ~SocketImpl() {}
     130           0 :     bool process(void* socket, Monitor& monitor)
     131             :     {
     132             :         // Messages consist of 2 Frames, the first containing the event-id and
     133             :         // the associated value. The second frame holds the affected endpoint as
     134             :         // string.
     135             :         zmq_msg_t msg;
     136           0 :         zmq_msg_init(&msg);
     137             : 
     138             :         //  The layout of the first Frame is: 16 bit event id 32 bit event value
     139           0 :         if (zmq_msg_recv(&msg, socket, 0) == -1)
     140             :         {
     141           0 :             ZEROEQWARN << "Can't read event id from monitor socket"
     142           0 :                        << std::endl;
     143           0 :             return false;
     144             :         }
     145           0 :         const uint16_t event = *(uint16_t*)zmq_msg_data(&msg);
     146             :         // Ignore event value
     147             : 
     148           0 :         if (zmq_msg_more(&msg))
     149             :         {
     150           0 :             zmq_msg_close(&msg);
     151             : 
     152             :             //  Second frame in message contains event address, skip
     153           0 :             zmq_msg_init(&msg);
     154           0 :             if (zmq_msg_recv(&msg, socket, 0) == -1)
     155           0 :                 ZEROEQWARN << "Can't read address from monitor socket"
     156           0 :                            << std::endl;
     157             :         }
     158             :         else
     159           0 :             ZEROEQWARN << "Monitor event has no event address" << std::endl;
     160             : 
     161           0 :         zmq_msg_close(&msg);
     162             : 
     163           0 :         switch (event)
     164             :         {
     165             :         case ZMQ_EVENT_CONNECTED:
     166             :         case ZMQ_EVENT_ACCEPTED:
     167           0 :             monitor.notifyNewConnection();
     168           0 :             return true;
     169             : 
     170             :         default:
     171           0 :             ZEROEQWARN << "Unhandled monitor event " << event << std::endl;
     172             :         }
     173           0 :         return false;
     174             :     }
     175             : 
     176             : private:
     177             :     zmq::ContextPtr _context;
     178             : };
     179             : 
     180           2 : Monitor::Impl* newImpl(Sender& sender)
     181             : {
     182           2 :     if (dynamic_cast<Publisher*>(&sender))
     183           2 :         return new XPubImpl(sender);
     184           0 :     return new SocketImpl(sender);
     185             : }
     186             : }
     187             : 
     188           1 : Monitor::Monitor(Sender& sender)
     189             :     : Receiver()
     190           1 :     , _impl(newImpl(sender))
     191             : {
     192           1 : }
     193             : 
     194           1 : Monitor::Monitor(Sender& sender, Receiver& shared)
     195             :     : Receiver(shared)
     196           1 :     , _impl(newImpl(sender))
     197             : {
     198           1 : }
     199             : 
     200           2 : Monitor::~Monitor()
     201             : {
     202           2 : }
     203             : 
     204           2 : void Monitor::addSockets(std::vector<zeroeq::detail::Socket>& entries)
     205             : {
     206           2 :     _impl->addSockets(entries);
     207           2 : }
     208             : 
     209           2 : bool Monitor::process(zeroeq::detail::Socket& socket)
     210             : {
     211           2 :     return _impl->process(socket.socket, *this);
     212             : }
     213          24 : }

Generated by: LCOV version 1.11