LCOV - code coverage report
Current view: top level - zeroeq/connection - broker.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 54 58 93.1 %
Date: 2017-12-01 01:44:57 Functions: 15 15 100.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2014-2017, Human Brain Project
       3             :  *                          Stefan.Eilemann@epfl.ch
       4             :  */
       5             : 
       6             : #include "broker.h"
       7             : 
       8             : #include <zeroeq/detail/port.h>
       9             : #include <zeroeq/detail/sender.h>
      10             : #include <zeroeq/detail/socket.h>
      11             : #include <zeroeq/log.h>
      12             : #include <zeroeq/receiver.h>
      13             : 
      14             : #include <cassert>
      15             : #include <map>
      16             : 
      17             : namespace zeroeq
      18             : {
      19             : namespace connection
      20             : {
      21             : namespace detail
      22             : {
      23             : class Broker : public zeroeq::detail::Sender
      24             : {
      25             : public:
      26           2 :     Broker(const std::string& name, Receiver& receiver,
      27             :            const connection::Broker::PortSelection mode)
      28           6 :         : Sender(URI(std::string("tcp://*:") +
      29           4 :                      std::to_string(uint32_t(zeroeq::detail::getPort(name)))),
      30             :                  ZMQ_REP)
      31           4 :         , _receiver(receiver)
      32             :     {
      33           2 :         if (!_listen(mode))
      34             :         {
      35           0 :             uri = URI("tcp://*:0");
      36           0 :             _listen(connection::Broker::PORT_FIXED);
      37             :         }
      38           2 :         initURI();
      39           2 :     }
      40             : 
      41           2 :     Broker(Receiver& receiver, const std::string& address)
      42           4 :         : Sender(URI(std::string("tcp://") + address), ZMQ_REP)
      43           5 :         , _receiver(receiver)
      44             :     {
      45           2 :         _listen(connection::Broker::PORT_FIXED);
      46           1 :         initURI();
      47           1 :     }
      48             : 
      49           3 :     ~Broker() {}
      50        8757 :     void addSockets(std::vector<zeroeq::detail::Socket>& entries)
      51             :     {
      52        8757 :         assert(socket);
      53        8757 :         if (!socket)
      54           0 :             return;
      55             : 
      56             :         zeroeq::detail::Socket entry;
      57        8757 :         entry.socket = socket.get();
      58        8757 :         entry.events = ZMQ_POLLIN;
      59        8757 :         entries.push_back(entry);
      60             :     }
      61             : 
      62           2 :     bool process(zeroeq::detail::Socket& socket_)
      63             :     {
      64             :         zmq_msg_t msg;
      65           2 :         zmq_msg_init(&msg);
      66           2 :         zmq_msg_recv(&msg, socket_.socket, 0);
      67           2 :         const std::string address((const char*)zmq_msg_data(&msg),
      68           6 :                                   zmq_msg_size(&msg));
      69             : 
      70           2 :         _receiver.addConnection(std::string("tcp://") + address);
      71           2 :         zmq_msg_send(&msg, socket_.socket, 0);
      72           2 :         zmq_msg_close(&msg);
      73           4 :         return true;
      74             :     }
      75             : 
      76             : private:
      77             :     zeroeq::Receiver& _receiver;
      78             : 
      79           4 :     bool _listen(const connection::Broker::PortSelection mode)
      80             :     {
      81             :         const std::string address =
      82           8 :             std::to_string(uri) + (uri.getPort() ? "" : ":0");
      83           4 :         if (zmq_bind(socket.get(), address.c_str()) == -1)
      84             :         {
      85           1 :             if (mode == connection::Broker::PORT_FIXED)
      86           1 :                 ZEROEQTHROW(std::runtime_error("Cannot connect broker to " +
      87             :                                                address + ": " +
      88             :                                                zmq_strerror(zmq_errno())));
      89             : 
      90           0 :             return false;
      91             :         }
      92             : 
      93           3 :         ZEROEQINFO << "Bound broker to " << address << std::endl;
      94           3 :         return true;
      95             :     }
      96             : };
      97             : }
      98             : 
      99           2 : Broker::Broker(const std::string& name, Receiver& receiver,
     100           2 :                const PortSelection mode)
     101             :     : Receiver(receiver)
     102           2 :     , _impl(new detail::Broker(name, receiver, mode))
     103             : {
     104           2 : }
     105             : 
     106           2 : Broker::Broker(const std::string& address, Receiver& receiver)
     107             :     : Receiver(receiver)
     108           3 :     , _impl(new detail::Broker(receiver, address))
     109             : {
     110           1 : }
     111             : 
     112           9 : Broker::~Broker()
     113             : {
     114           3 :     delete _impl;
     115           6 : }
     116             : 
     117        8757 : void Broker::addSockets(std::vector<zeroeq::detail::Socket>& entries)
     118             : {
     119        8757 :     _impl->addSockets(entries);
     120        8757 : }
     121             : 
     122           2 : bool Broker::process(zeroeq::detail::Socket& socket)
     123             : {
     124           2 :     return _impl->process(socket);
     125             : }
     126             : 
     127           3 : std::string Broker::getAddress() const
     128             : {
     129           3 :     return _impl->getAddress();
     130             : }
     131             : }
     132          24 : }

Generated by: LCOV version 1.11