LCOV - code coverage report
Current view: top level - zeroeq - client.cpp (source / functions) Hit Total Coverage
Test: ZeroEQ Lines: 93 125 74.4 %
Date: 2017-12-01 01:44:57 Functions: 22 30 73.3 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2017, Human Brain Project
       3             :  *                     Stefan.Eilemann@epfl.ch
       4             :  */
       5             : 
       6             : #include "client.h"
       7             : 
       8             : #include "detail/common.h"
       9             : #include "detail/receiver.h"
      10             : 
      11             : #include <servus/servus.h>
      12             : #include <thread>
      13             : #include <unordered_map>
      14             : 
      15             : namespace zeroeq
      16             : {
      17             : class Client::Impl : public detail::Receiver
      18             : {
      19             : public:
      20           1 :     explicit Impl(const std::string& session)
      21           1 :         : detail::Receiver(SERVER_SERVICE, session == DEFAULT_SESSION
      22           2 :                                                ? getDefaultRepSession()
      23             :                                                : session)
      24             :         , _servers(zmq_socket(getContext(), ZMQ_DEALER),
      25           3 :                    [](void* s) { ::zmq_close(s); })
      26             :     {
      27           1 :         const char* serversEnv = getenv("ZEROEQ_SERVERS");
      28           1 :         if (!serversEnv)
      29           0 :             return;
      30             : 
      31           2 :         std::string servers(serversEnv);
      32           5 :         while (!servers.empty())
      33             :         {
      34           2 :             const size_t pos = servers.find(',');
      35           4 :             const std::string server = servers.substr(0, pos);
      36           4 :             servers = pos == std::string::npos ? std::string()
      37           2 :                                                : servers.substr(pos + 1);
      38             : 
      39           4 :             const auto& zmqURI = buildZmqURI(URI(server));
      40           2 :             if (!addConnection(zmqURI))
      41           0 :                 ZEROEQTHROW(std::runtime_error("Cannot connect client to " +
      42             :                                                zmqURI + ": " +
      43             :                                                zmq_strerror(zmq_errno())));
      44             :         }
      45             : 
      46           1 :         update();
      47             :     }
      48             : 
      49          11 :     explicit Impl(const URIs& uris)
      50          11 :         : detail::Receiver(SERVER_SERVICE)
      51             :         , _servers(zmq_socket(getContext(), ZMQ_DEALER),
      52          22 :                    [](void* s) { ::zmq_close(s); })
      53             :     {
      54          23 :         for (const auto& uri : uris)
      55             :         {
      56          12 :             if (!uri.isFullyQualified())
      57           0 :                 ZEROEQTHROW(std::runtime_error(
      58             :                     std::string("Non-fully qualified URI used for server")));
      59             : 
      60          24 :             const auto& zmqURI = buildZmqURI(uri);
      61          12 :             if (!addConnection(zmqURI))
      62           0 :                 ZEROEQTHROW(std::runtime_error("Cannot connect client to " +
      63             :                                                zmqURI + ": " +
      64             :                                                zmq_strerror(zmq_errno())));
      65             :         }
      66          11 :     }
      67             : 
      68          24 :     ~Impl() {}
      69             : 
      70          14 :     zmq::SocketPtr createSocket(const uint128_t&) final { return _servers; }
      71             : 
      72          14 :     bool request(uint128_t requestID, const void* data, const size_t size,
      73             :                  const ReplyFunc& func)
      74             :     {
      75          14 :         const bool hasPayload = data && size > 0;
      76          14 :         ++_id;
      77             : #ifdef ZEROEQ_BIGENDIAN
      78             :         detail::byteswap(requestID); // convert to little endian wire protocol
      79             : #endif
      80             : 
      81          42 :         if (!_send(&_id, sizeof(_id), ZMQ_SNDMORE) ||
      82          28 :             !_send(nullptr, 0, ZMQ_SNDMORE) || // frame delimiter
      83          14 :             !_send(&requestID, sizeof(requestID), hasPayload ? ZMQ_SNDMORE : 0))
      84             :         {
      85           0 :             return false;
      86             :         }
      87             : 
      88          14 :         if (hasPayload && !_send(data, size, 0))
      89           0 :             return false;
      90             : 
      91          14 :         _handlers[_id] = func;
      92          14 :         return true;
      93             :     }
      94             : 
      95          14 :     bool process(detail::Socket& socket)
      96             :     {
      97             :         uint64_t id;
      98          14 :         uint128_t replyID;
      99             : 
     100          14 :         if (!_recv(&id, sizeof(id), ZMQ_DONTWAIT) || !_recv(nullptr, 0, 0))
     101           0 :             return false;
     102          14 :         const bool payload = _recv(&replyID, sizeof(replyID), 0);
     103             : 
     104             : #ifdef ZEROEQ_BIGENDIAN
     105             :         detail::byteswap(replyID); // convert to little endian wire protocol
     106             : #endif
     107             : 
     108             :         zmq_msg_t msg;
     109          14 :         if (payload)
     110             :         {
     111          11 :             zmq_msg_init(&msg);
     112          11 :             zmq_msg_recv(&msg, socket.socket, 0);
     113             :         }
     114             : 
     115          14 :         auto i = _handlers.find(id);
     116          14 :         if (i == _handlers.cend())
     117             :         {
     118           0 :             if (payload)
     119           0 :                 zmq_msg_close(&msg);
     120             : 
     121           0 :             ZEROEQTHROW(std::runtime_error("Got unrequested reply " +
     122             :                                            std::to_string(id)));
     123             :         }
     124             : 
     125          14 :         if (payload)
     126             :         {
     127          11 :             i->second(replyID, zmq_msg_data(&msg), zmq_msg_size(&msg));
     128          11 :             zmq_msg_close(&msg);
     129             :         }
     130             :         else
     131           3 :             i->second(replyID, nullptr, 0);
     132          14 :         _handlers.erase(i);
     133          14 :         return true;
     134             :     }
     135             : 
     136             : private:
     137          52 :     bool _send(const void* data, const size_t size, int flags)
     138             :     {
     139             :         zmq_msg_t msg;
     140          52 :         zmq_msg_init_size(&msg, size);
     141          52 :         if (data)
     142          38 :             ::memcpy(zmq_msg_data(&msg), data, size);
     143             : 
     144          52 :         flags |= ZMQ_DONTWAIT;
     145             :         while (true)
     146             :         {
     147          52 :             const int ret = zmq_msg_send(&msg, _servers.get(), flags);
     148          52 :             if (ret == -1 && zmq_errno() == EAGAIN)
     149             :             {
     150           0 :                 if (!update())
     151           0 :                     std::this_thread::sleep_for(std::chrono::milliseconds(10));
     152             :             }
     153             :             else
     154             :             {
     155          52 :                 zmq_msg_close(&msg);
     156             : 
     157          52 :                 if (ret != -1)
     158          52 :                     return true;
     159             : 
     160             :                 ZEROEQWARN << "Cannot send request: "
     161           0 :                            << zmq_strerror(zmq_errno()) << std::endl;
     162           0 :                 return false;
     163             :             }
     164           0 :         }
     165             :     }
     166             : 
     167             :     /** @return true if more data available */
     168          42 :     bool _recv(void* data, const size_t size, const int flags)
     169             :     {
     170             :         zmq_msg_t msg;
     171          42 :         zmq_msg_init(&msg);
     172          42 :         if (zmq_msg_recv(&msg, _servers.get(), flags) == -1)
     173           0 :             return false;
     174             : 
     175          42 :         if (zmq_msg_size(&msg) != size)
     176           0 :             ZEROEQWARN << "Reply size mismatch, expected " << size << " got "
     177           0 :                        << zmq_msg_size(&msg) << std::endl;
     178          42 :         else if (data)
     179          28 :             ::memcpy(data, zmq_msg_data(&msg), size);
     180          42 :         const bool more = zmq_msg_more(&msg);
     181          42 :         zmq_msg_close(&msg);
     182          42 :         return more;
     183             :     }
     184             : 
     185             :     zmq::SocketPtr _servers;
     186             :     std::unordered_map<uint64_t, ReplyFunc> _handlers;
     187             :     uint64_t _id{0};
     188             : };
     189             : 
     190           1 : Client::Client()
     191             :     : Receiver()
     192           1 :     , _impl(new Impl(DEFAULT_SESSION))
     193             : {
     194           1 : }
     195             : 
     196           0 : Client::Client(const std::string& session)
     197             :     : Receiver()
     198           0 :     , _impl(new Impl(session))
     199             : {
     200           0 : }
     201             : 
     202          10 : Client::Client(const URIs& uris)
     203             :     : Receiver()
     204          10 :     , _impl(new Impl(uris))
     205             : {
     206          10 : }
     207             : 
     208           0 : Client::Client(Receiver& shared)
     209             :     : Receiver(shared)
     210           0 :     , _impl(new Impl(DEFAULT_SESSION))
     211             : {
     212           0 : }
     213             : 
     214           0 : Client::Client(const std::string& session, Receiver& shared)
     215             :     : Receiver(shared)
     216           0 :     , _impl(new Impl(session))
     217             : {
     218           0 : }
     219             : 
     220           1 : Client::Client(const URIs& uris, Receiver& shared)
     221             :     : Receiver(shared)
     222           1 :     , _impl(new Impl(uris))
     223             : {
     224           1 : }
     225             : 
     226          12 : Client::~Client()
     227             : {
     228          12 : }
     229             : 
     230          11 : bool Client::request(const servus::Serializable& req, const ReplyFunc& func)
     231             : {
     232          22 :     const auto& data = req.toBinary();
     233          22 :     return request(req.getTypeIdentifier(), data.ptr.get(), data.size, func);
     234             : }
     235             : 
     236          14 : bool Client::request(const uint128_t& requestID, const void* data,
     237             :                      const size_t size, const ReplyFunc& func)
     238             : {
     239          14 :     return _impl->request(requestID, data, size, func);
     240             : }
     241             : 
     242           0 : const std::string& Client::getSession() const
     243             : {
     244           0 :     return _impl->getSession();
     245             : }
     246             : 
     247       55422 : void Client::addSockets(std::vector<detail::Socket>& entries)
     248             : {
     249       55422 :     _impl->addSockets(entries);
     250       55422 : }
     251             : 
     252          14 : bool Client::process(detail::Socket& socket)
     253             : {
     254          14 :     return _impl->process(socket);
     255             : }
     256             : 
     257       55422 : void Client::update()
     258             : {
     259       55422 :     _impl->update();
     260       55422 : }
     261             : 
     262           0 : void Client::addConnection(const std::string& uri)
     263             : {
     264           0 :     _impl->addConnection(uri);
     265           0 : }
     266          24 : }

Generated by: LCOV version 1.11