LCOV - code coverage report
Current view: top level - zeroeq - server.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 79 106 74.5 %
Date: 2017-12-01 01:44:57 Functions: 18 27 66.7 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2017, Human Brain Project
       3             :  *                     Stefan.Eilemann@epfl.ch
       4             :  */
       5             : 
       6             : #include "server.h"
       7             : 
       8             : #include "detail/receiver.h"
       9             : #include "detail/sender.h"
      10             : 
      11             : #include <zmq.h>
      12             : 
      13             : #include <cassert>
      14             : #include <unordered_map>
      15             : 
      16             : namespace zeroeq
      17             : {
      18             : class Server::Impl : public detail::Sender
      19             : {
      20             : public:
      21          14 :     Impl(const URI& uri_, const std::string& session)
      22          14 :         : detail::Sender(uri_, ZMQ_REP, SERVER_SERVICE,
      23          28 :                          session == DEFAULT_SESSION ? getDefaultRepSession()
      24          30 :                                                     : session)
      25             :     {
      26          14 :         if (session.empty())
      27           1 :             ZEROEQTHROW(
      28             :                 std::runtime_error("Empty session is not allowed for server"));
      29             : 
      30          26 :         const std::string& zmqURI = buildZmqURI(uri);
      31          13 :         if (zmq_bind(socket.get(), zmqURI.c_str()) == -1)
      32           1 :             ZEROEQTHROW(
      33             :                 std::runtime_error(std::string("Cannot bind server socket '") +
      34             :                                    zmqURI + "': " + zmq_strerror(zmq_errno())));
      35          12 :         initURI();
      36          12 :         if (session != NULL_SESSION)
      37           0 :             announce();
      38          12 :     }
      39             : 
      40          12 :     ~Impl() {}
      41             : 
      42          42 :     bool handle(const uint128_t& request, const HandleFunc& func)
      43             :     {
      44          42 :         if (_handlers.find(request) != _handlers.end())
      45          14 :             return false;
      46             : 
      47          28 :         _handlers[request] = func;
      48          28 :         return true;
      49             :     }
      50             : 
      51          42 :     bool remove(const uint128_t& request)
      52             :     {
      53          42 :         return _handlers.erase(request) > 0;
      54             :     }
      55             : 
      56          14 :     bool process(detail::Socket&)
      57             :     {
      58          14 :         uint128_t requestID;
      59          14 :         const bool payload = _recv(&requestID, sizeof(requestID));
      60             : 
      61             : #ifdef ZEROEQ_BIGENDIAN
      62             :         detail::byteswap(requestID); // convert from little endian wire protocol
      63             : #endif
      64             : 
      65             :         zmq_msg_t msg;
      66          14 :         if (payload)
      67             :         {
      68          10 :             zmq_msg_init(&msg);
      69          10 :             zmq_msg_recv(&msg, socket.get(), 0);
      70             :         }
      71             : 
      72          14 :         auto i = _handlers.find(requestID);
      73          14 :         if (i == _handlers.cend()) // no handler, return "0"
      74             :         {
      75           1 :             const uint128_t zero;
      76           1 :             _send(&zero, sizeof(zero), 0); // request and reply, no playload
      77             :         }
      78             :         else
      79             :         {
      80             :             try
      81             :             {
      82             :                 auto reply =
      83          10 :                     payload ? i->second(zmq_msg_data(&msg), zmq_msg_size(&msg))
      84          35 :                             : i->second(nullptr, 0);
      85          12 :                 const bool hasReplyData = reply.second.ptr && reply.second.size;
      86             : #ifdef ZEROEQ_BIGENDIAN
      87             :                 detail::byteswap(reply.first); // convert to little endian
      88             : #endif
      89          24 :                 if (_send(&reply.first, sizeof(reply.first),
      90          12 :                           hasReplyData ? ZMQ_SNDMORE : 0) &&
      91             :                     hasReplyData)
      92             :                 {
      93          11 :                     _send(reply.second.ptr.get(), reply.second.size, 0);
      94             :                 }
      95             :             }
      96           2 :             catch (...) // handler had exception
      97             :             {
      98           1 :                 const uint128_t zero;
      99           1 :                 _send(&zero, sizeof(zero), 0); // request and reply, no playload
     100             :             }
     101             :         }
     102             : 
     103          14 :         if (payload)
     104          10 :             zmq_msg_close(&msg);
     105          14 :         return true;
     106             :     }
     107             : 
     108             : private:
     109          25 :     bool _send(const void* data, const size_t size, const int flags)
     110             :     {
     111             :         zmq_msg_t msg;
     112          25 :         zmq_msg_init_size(&msg, size);
     113          25 :         ::memcpy(zmq_msg_data(&msg), data, size);
     114          25 :         int ret = zmq_msg_send(&msg, socket.get(), flags);
     115          25 :         zmq_msg_close(&msg);
     116             : 
     117          25 :         if (ret != -1)
     118          25 :             return true;
     119             : 
     120           0 :         ZEROEQWARN << "Cannot send reply: " << zmq_strerror(zmq_errno())
     121           0 :                    << std::endl;
     122           0 :         return false;
     123             :     }
     124             : 
     125             :     /** @return true if more data available */
     126          14 :     bool _recv(void* data, const size_t size)
     127             :     {
     128             :         zmq_msg_t msg;
     129          14 :         zmq_msg_init(&msg);
     130          14 :         zmq_msg_recv(&msg, socket.get(), 0);
     131          14 :         if (zmq_msg_size(&msg) != size)
     132             :         {
     133           0 :             ZEROEQTHROW(std::runtime_error(
     134             :                 std::string("Message size mismatch, expected ") +
     135             :                 std::to_string(size) + " got " +
     136             :                 std::to_string(zmq_msg_size(&msg))));
     137             :         }
     138             :         else
     139          14 :             memcpy(data, zmq_msg_data(&msg), size);
     140          14 :         const bool more = zmq_msg_more(&msg);
     141          14 :         zmq_msg_close(&msg);
     142          14 :         return more;
     143             :     }
     144             : 
     145             :     std::unordered_map<uint128_t, HandleFunc> _handlers;
     146             : };
     147             : 
     148           0 : Server::Server()
     149             :     : Receiver()
     150           0 :     , _impl(new Impl({}, DEFAULT_SESSION))
     151             : {
     152           0 : }
     153             : 
     154          12 : Server::Server(const std::string& session)
     155             :     : Receiver()
     156          13 :     , _impl(new Impl({}, session))
     157             : {
     158          11 : }
     159             : 
     160           1 : Server::Server(const URI& uri)
     161             :     : Receiver()
     162           2 :     , _impl(new Impl(uri, DEFAULT_SESSION))
     163             : {
     164           0 : }
     165             : 
     166           1 : Server::Server(const URI& uri, const std::string& session)
     167             :     : Receiver()
     168           1 :     , _impl(new Impl(uri, session))
     169             : {
     170           1 : }
     171             : 
     172           0 : Server::Server(Receiver& shared)
     173             :     : Receiver(shared)
     174           0 :     , _impl(new Impl({}, DEFAULT_SESSION))
     175             : {
     176           0 : }
     177             : 
     178           0 : Server::Server(const std::string& session, Receiver& shared)
     179             :     : Receiver(shared)
     180           0 :     , _impl(new Impl({}, session))
     181             : {
     182           0 : }
     183             : 
     184           0 : Server::Server(const URI& uri, Receiver& shared)
     185             :     : Receiver(shared)
     186           0 :     , _impl(new Impl(uri, DEFAULT_SESSION))
     187             : {
     188           0 : }
     189             : 
     190           0 : Server::Server(const URI& uri, const std::string& session, Receiver& shared)
     191             :     : Receiver(shared)
     192           0 :     , _impl(new Impl(uri, session))
     193             : {
     194           0 : }
     195             : 
     196          12 : Server::~Server()
     197             : {
     198          12 : }
     199             : 
     200             : Server::Server(Server&&) = default;
     201             : Server& Server::operator=(Server&&) = default;
     202             : 
     203           0 : const std::string& Server::getSession() const
     204             : {
     205           0 :     return _impl->getSession();
     206             : }
     207             : 
     208         417 : void Server::addSockets(std::vector<detail::Socket>& entries)
     209             : {
     210         417 :     _impl->addSockets(entries);
     211         419 : }
     212             : 
     213          14 : bool Server::process(detail::Socket& socket)
     214             : {
     215          14 :     return _impl->process(socket);
     216             : }
     217             : 
     218           0 : void Server::addConnection(const std::string&)
     219             : {
     220           0 :     ZEROEQTHROW(std::runtime_error("Server cannot add connections"));
     221             : }
     222             : 
     223          16 : const URI& Server::getURI() const
     224             : {
     225          16 :     return _impl->uri;
     226             : }
     227             : 
     228          42 : bool Server::handle(const uint128_t& request, const HandleFunc& func)
     229             : {
     230          42 :     return _impl->handle(request, func);
     231             : }
     232             : 
     233          42 : bool Server::remove(const uint128_t& request)
     234             : {
     235          42 :     return _impl->remove(request);
     236             : }
     237             : 
     238           0 : zmq::SocketPtr Server::getSocket()
     239             : {
     240           0 :     return _impl->socket;
     241             : }
     242          24 : }

Generated by: LCOV version 1.11