LCOV - code coverage report
Current view: top level - zeroeq/detail - receiver.h (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 58 80 72.5 %
Date: 2017-12-01 01:44:57 Functions: 12 18 66.7 %

          Line data    Source code
       1             : /* Copyright (c) 2017, Human Brain Project
       2             :  *                          Stefan.Eilemann@epfl.ch
       3             :  */
       4             : 
       5             : #pragma once
       6             : 
       7             : #include "common.h"
       8             : #include "constants.h"
       9             : #include "context.h"
      10             : #include "socket.h"
      11             : 
      12             : #include "../log.h"
      13             : 
      14             : #include <servus/listener.h>
      15             : #include <servus/servus.h>
      16             : #include <zmq.h>
      17             : 
      18             : #include <algorithm>
      19             : 
      20             : namespace zeroeq
      21             : {
      22             : namespace detail
      23             : {
      24             : /** Manages and updates a set of connections with a zeroconf browser. */
      25             : class Receiver : public servus::Listener
      26             : {
      27             : public:
      28          29 :     Receiver(const std::string& service, const std::string session)
      29          58 :         : _servus(session == TEST_SESSION ? session : service)
      30             :         , _session(session)
      31          62 :         , _context(detail::getContext())
      32             :     {
      33          29 :         if (session == zeroeq::NULL_SESSION || session.empty())
      34           4 :             ZEROEQTHROW(std::runtime_error(
      35             :                 std::string("Invalid session name for browsing")));
      36             : 
      37          25 :         if (!servus::Servus::isAvailable())
      38             :         {
      39             :             ZEROEQWARN << "ZeroEQ::Receiver: Cannot browse Zeroconf for "
      40             :                           "incoming connections; no implementation provided by "
      41           0 :                           "Servus"
      42           0 :                        << std::endl;
      43           0 :             return;
      44             :         }
      45             : 
      46          25 :         _servus.addListener(this);
      47          25 :         _servus.beginBrowsing(servus::Servus::IF_ALL);
      48             :     }
      49             : 
      50          30 :     Receiver(const std::string& service)
      51          30 :         : _servus(service)
      52             :         , _session(zeroeq::NULL_SESSION)
      53          30 :         , _context(detail::getContext())
      54             :     {
      55          30 :     }
      56             : 
      57          55 :     virtual ~Receiver()
      58         110 :     {
      59          55 :         if (_servus.isBrowsing())
      60             :         {
      61          25 :             _servus.endBrowsing();
      62          25 :             _servus.addListener(this);
      63             :         }
      64          55 :     }
      65             : 
      66           2 :     const std::string& getSession() const { return _session; }
      67      380695 :     bool update() //!< @return true if new connection made
      68             :     {
      69      380695 :         if (!_servus.isBrowsing())
      70      317246 :             return false;
      71             : 
      72       63449 :         _updated = false;
      73       63449 :         _servus.browse(0);
      74       63449 :         return _updated;
      75             :     }
      76             : 
      77           6 :     void instanceAdded(const std::string& instance) final
      78             :     {
      79          12 :         const std::string& zmqURI = _getZmqURI(instance);
      80           6 :         if (_sockets.count(zmqURI) > 0) // Already got this instance
      81           0 :             return;
      82             : 
      83           6 :         const std::string& session = _servus.get(instance, KEY_SESSION);
      84          12 :         if (_servus.containsKey(instance, KEY_SESSION) && !_session.empty() &&
      85           6 :             session != _session)
      86             :         {
      87           0 :             return;
      88             :         }
      89             : 
      90           6 :         const uint128_t identifier(_servus.get(instance, KEY_INSTANCE));
      91          12 :         zmq::SocketPtr socket = createSocket(identifier);
      92           6 :         if (socket && _connect(zmqURI, socket))
      93           4 :             _updated = true;
      94             :     }
      95             : 
      96           0 :     void instanceRemoved(const std::string& instance) final
      97             :     {
      98           0 :         if (_disconnect(_getZmqURI(instance)))
      99           0 :             _updated = true;
     100           0 :     }
     101             : 
     102          34 :     bool addConnection(const std::string& zmqURI)
     103             :     {
     104          68 :         zmq::SocketPtr socket = createSocket(uint128_t());
     105          34 :         if (socket)
     106          34 :             return _connect(zmqURI, socket);
     107           0 :         return true;
     108             :     }
     109             : 
     110      380670 :     void addSockets(std::vector<detail::Socket>& entries)
     111             :     {
     112      380670 :         entries.insert(entries.end(), _entries.begin(), _entries.end());
     113      380670 :     }
     114             : 
     115             : protected:
     116             :     using SocketMap = std::map<std::string, zmq::SocketPtr>;
     117             : 
     118          36 :     void* getContext() { return _context.get(); }
     119             :     /**
     120             :      * Create the socket for the given instance, return nullptr if connection is
     121             :      * to be ignored.
     122             :      */
     123             :     virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0;
     124             : 
     125          33 :     const SocketMap& getSockets() { return _sockets; }
     126          38 :     bool _connect(const std::string& zmqURI, zmq::SocketPtr socket)
     127             :     {
     128          38 :         if (zmq_connect(socket.get(), zmqURI.c_str()) == -1)
     129             :         {
     130             :             ZEROEQINFO << "Cannot connect to " << zmqURI << ": "
     131           1 :                        << zmq_strerror(zmq_errno()) << std::endl;
     132           1 :             return false;
     133             :         }
     134             : 
     135          37 :         _sockets[zmqURI] = socket; // ref socket since zmq struct is void*
     136             : 
     137             :         detail::Socket entry;
     138          37 :         entry.socket = socket.get();
     139          37 :         entry.events = ZMQ_POLLIN;
     140          37 :         _entries.push_back(entry);
     141          37 :         return true;
     142             :     }
     143             : 
     144           0 :     bool _disconnect(const std::string& zmqURI)
     145             :     {
     146           0 :         auto i = _sockets.find(zmqURI);
     147           0 :         if (i == _sockets.end()) // Don't know this instance
     148           0 :             return false;
     149             : 
     150           0 :         auto socket = i->second;
     151           0 :         if (zmq_disconnect(socket.get(), zmqURI.c_str()) == -1)
     152             :         {
     153             :             ZEROEQINFO << "Cannot disconnect from " << zmqURI << ": "
     154           0 :                        << zmq_strerror(zmq_errno()) << std::endl;
     155             :         }
     156             : 
     157             :         std::remove_if(_entries.begin(), _entries.end(),
     158           0 :                        [socket](const detail::Socket& candidate) {
     159           0 :                            return candidate.socket == socket.get();
     160           0 :                        });
     161           0 :         _sockets.erase(i);
     162           0 :         return true;
     163             :     }
     164             : 
     165             : private:
     166             :     servus::Servus _servus;
     167             :     const std::string _session;
     168             : 
     169             :     zmq::ContextPtr _context;
     170             :     SocketMap _sockets;
     171             :     std::vector<detail::Socket> _entries;
     172             : 
     173             :     bool _updated{false};
     174             : 
     175           6 :     std::string _getZmqURI(const std::string& instance)
     176             :     {
     177           6 :         const size_t pos = instance.find(":");
     178          12 :         const std::string& host = instance.substr(0, pos);
     179          12 :         const std::string& port = instance.substr(pos + 1);
     180             : 
     181          12 :         return buildZmqURI(DEFAULT_SCHEMA, host, std::stoi(port));
     182             :     }
     183             : };
     184             : }
     185             : }

Generated by: LCOV version 1.11