ZeroEQ  0.9.0
ZeroEQ - Zero Event Queue
zeroeq::Client Class Reference

Requests a remote procedure call on a Server. More...

#include <client.h>

+ Inheritance diagram for zeroeq::Client:
+ Collaboration diagram for zeroeq::Client:

Public Member Functions

 Client ()
 Create a default client. More...
 
 Client (const std::string &session)
 Create a client which connects to servers from the given session. More...
 
 Client (const URIs &uris)
 Create a client which subscribes to specific servers. More...
 
 Client (Receiver &shared)
 Create a default shared client. More...
 
 Client (const std::string &session, Receiver &shared)
 Create a shared client which subscribes to servers from the given session. More...
 
 Client (const URIs &uris, Receiver &shared)
 Create a shared client which subscribes to servers on the given URIs. More...
 
 ~Client ()
 Destroy this client. More...
 
bool request (const servus::Serializable &request, const ReplyFunc &func)
 Request the execution of the given data on a connected Server. More...
 
bool request (const uint128_t &request, const void *data, size_t size, const ReplyFunc &func)
 Request the execution of the given data on a connected Server. More...
 
const std::string & getSession () const
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
 Receiver (Receiver &&)
 
Receiveroperator= (Receiver &&)
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Additional Inherited Members

Detailed Description

Requests a remote procedure call on a Server.

If the client is in the same session as discovered servers, it automatically connects to those servers. Requests are executed round-robin on all connected servers.

A connection to a non-existing server is valid. Requests will be executed once the servers are available.

A receive on any instance of a shared group will receive on all instances and call the registered handlers.

The session of a Client-Server setup is typically semantically different from the session of a Publisher-Subscriber setup: In the former, it identifies a service provided, i.e., the type of requests served. In the latter, it identifies an interest scope, e.g., all the applications of a given user.

Not thread safe.

Example:

/* Copyright (c) 2017, Human Brain Project
* Stefan.Eilemann@epfl.ch
*/
#define BOOST_TEST_MODULE zeroeq_req_rep
#include "common.h"
#include <servus/servus.h>
#include <servus/uri.h>
#include <atomic>
#include <chrono>
#include <thread>
namespace
{
static const float TIMEOUT = 1000.f; // milliseconds
// Run from a thread: No BOOST_CHECK macros allowed. throw up instead.
template <class R>
bool runOnce(zeroeq::Server& server, const test::Echo& request, const R& reply)
{
bool handled = false;
const auto func = [&](const void* data, const size_t size) {
if (!((data && size) || (!data && !size)))
throw std::runtime_error("Unexpected handle parameters");
if (handled)
throw std::runtime_error("Already handled request");
if (data)
{
test::Echo got;
got.fromBinary(data, size);
if (got != request)
throw std::runtime_error("Request does not match expectation");
}
handled = true;
return zeroeq::ReplyData{R::IDENTIFIER(), reply.toBinary()};
};
if (!server.handle(test::Echo::IDENTIFIER(), func) ||
!server.handle(test::Empty::IDENTIFIER(), func) ||
server.handle(test::Empty::IDENTIFIER(), func))
{
throw std::runtime_error("Handler registration failed");
}
if (handled)
throw std::runtime_error("Already handled a request");
if (!server.receive(TIMEOUT))
throw std::runtime_error("No request handled");
if (!server.remove(test::Echo::IDENTIFIER()) ||
!server.remove(test::Empty::IDENTIFIER()))
{
throw std::runtime_error("Can't remove request handler");
}
if (server.remove(test::Echo::IDENTIFIER()))
throw std::runtime_error("Can remove removed request handler");
return handled;
}
class EchoThrows : public test::Echo
{
public:
explicit EchoThrows(const std::string& message)
: Echo(message)
{
}
private:
Data _toBinary() const final
{
throw std::runtime_error("I've had enough!");
return Data();
}
};
}
BOOST_AUTO_TEST_CASE(serializable)
{
test::Echo echo("The quick brown fox");
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
#if (ZMQ_VERSION >= 40104)
test::Monitor monitor(server);
#endif
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, echo, reply); });
bool handled = false;
client.request(echo, [&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
BOOST_CHECK(!handled);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK(handled);
#if (ZMQ_VERSION >= 40104)
BOOST_CHECK_EQUAL(monitor.connections, 0);
BOOST_CHECK(monitor.receive(TIMEOUT));
BOOST_CHECK_EQUAL(monitor.connections, 1);
#endif
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(empty_request_raw)
{
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
#if (ZMQ_VERSION >= 40104)
test::Monitor monitor(server, client);
#endif
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, {}, reply); });
bool handled = false;
client.request(test::Echo::IDENTIFIER(), nullptr, 0,
[&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
BOOST_CHECK(!handled);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
#if (ZMQ_VERSION >= 40104)
if (!handled || monitor.connections == 0)
{
BOOST_CHECK(client.receive(TIMEOUT));
}
BOOST_CHECK_EQUAL(monitor.connections, 1);
#endif
BOOST_CHECK(handled);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(issue224)
{
test::Echo echo("The quick brown fox");
const EchoThrows reply("Jumped over the lazy dog");
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, echo, reply); });
bool handled = false;
client.request(echo, [&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, 0);
BOOST_CHECK(!data);
BOOST_CHECK_EQUAL(size, 0);
BOOST_CHECK(!handled);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK(handled);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(empty_request_object)
{
zeroeq::Server server(zeroeq::URI("inproc://zeroeq.test.empty_request_raw"),
zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
const test::Echo reply("Jumped over the lazy dog");
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, {}, reply); });
bool handled = false;
client.request(test::Empty(), [&](const zeroeq::uint128_t& type,
const void* data, const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
BOOST_CHECK(!handled);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK(handled);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(empty_reqrep)
{
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
const test::Empty reply{};
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, {}, reply); });
bool handled = false;
client.request(test::Echo::IDENTIFIER(), nullptr, 0,
[&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Empty::IDENTIFIER());
BOOST_CHECK(!data);
BOOST_CHECK_EQUAL(size, 0);
BOOST_CHECK(!handled);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK(handled);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(unhandled_request)
{
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client({server.getURI()});
const test::Empty reply{};
bool serverHandled = false;
std::thread thread([&] { serverHandled = runOnce(server, {}, reply); });
bool handled = false;
client.request(servus::make_UUID(), nullptr, 0,
[&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, servus::uint128_t());
BOOST_CHECK(!data);
BOOST_CHECK_EQUAL(size, 0);
BOOST_CHECK(!handled);
handled = true;
});
BOOST_CHECK(!handled);
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK(handled);
thread.join();
BOOST_CHECK(!serverHandled);
}
BOOST_AUTO_TEST_CASE(two_servers)
{
test::Echo echo("The quick brown fox");
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server1(zeroeq::NULL_SESSION);
zeroeq::Server server2(zeroeq::NULL_SESSION);
zeroeq::Client client(zeroeq::URIs{server1.getURI(), server2.getURI()});
bool serverHandled = true;
std::thread thread1([&] {
if (!runOnce(server1, echo, reply))
serverHandled = false;
});
std::thread thread2([&] {
if (!runOnce(server2, echo, reply))
serverHandled = false;
});
size_t handled = 0;
const auto func = [&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
++handled;
};
client.request(echo, func);
client.request(echo, func);
BOOST_CHECK_EQUAL(handled, 0);
BOOST_CHECK(client.receive(TIMEOUT));
if (handled < 2)
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK_EQUAL(handled, 2);
BOOST_CHECK(!client.receive(TIMEOUT / 10));
BOOST_CHECK_EQUAL(handled, 2);
thread1.join();
thread2.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(envconnect)
{
test::Echo echo("The quick brown fox");
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server1(zeroeq::NULL_SESSION);
zeroeq::Server server2(zeroeq::NULL_SESSION);
std::string servers(server1.getURI().getHost() + ":" +
std::to_string(int(server1.getURI().getPort())) + "," +
server2.getURI().getHost() + ":" +
std::to_string(int(server2.getURI().getPort())));
setenv("ZEROEQ_SERVERS", servers.c_str(), 1);
bool handled1 = false;
bool handled2 = false;
std::thread thread1([&] { handled1 = runOnce(server1, echo, reply); });
std::thread thread2([&] { handled2 = runOnce(server2, echo, reply); });
size_t handled = 0;
const auto func = [&](const zeroeq::uint128_t&, const void*, size_t) {
++handled;
};
client.request(echo, func);
client.request(echo, func);
BOOST_CHECK_EQUAL(handled, 0);
BOOST_CHECK(client.receive(TIMEOUT));
if (handled < 2)
BOOST_CHECK(client.receive(TIMEOUT));
BOOST_CHECK_EQUAL(handled, 2);
thread1.join();
thread2.join();
BOOST_CHECK(handled1);
BOOST_CHECK(handled2);
}
BOOST_AUTO_TEST_CASE(two_clients)
{
test::Echo echo("The quick brown fox");
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client1({server.getURI()});
zeroeq::Client client2({server.getURI()});
bool serverHandled = false;
std::thread thread([&] {
serverHandled =
runOnce(server, echo, reply) && runOnce(server, echo, reply);
});
size_t handled = 0;
const auto func = [&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
++handled;
};
client1.request(echo, func);
client2.request(echo, func);
BOOST_CHECK_EQUAL(handled, 0);
BOOST_CHECK(client1.receive(TIMEOUT));
BOOST_CHECK_EQUAL(handled, 1);
BOOST_CHECK(!client1.receive(TIMEOUT / 10));
BOOST_CHECK_EQUAL(handled, 1);
BOOST_CHECK(client2.receive(TIMEOUT));
BOOST_CHECK_EQUAL(handled, 2);
BOOST_CHECK(!client2.receive(TIMEOUT / 10));
BOOST_CHECK_EQUAL(handled, 2);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(two_clients_shared)
{
test::Echo echo("The quick brown fox");
const test::Echo reply("Jumped over the lazy dog");
zeroeq::Server server(zeroeq::NULL_SESSION);
zeroeq::Client client1({server.getURI()});
zeroeq::Client client2({server.getURI()}, client1);
bool serverHandled = false;
std::thread thread([&] {
serverHandled =
runOnce(server, echo, reply) && runOnce(server, echo, reply);
});
size_t handled = 0;
const auto func = [&](const zeroeq::uint128_t& type, const void* data,
const size_t size) {
BOOST_CHECK_EQUAL(type, test::Echo::IDENTIFIER());
BOOST_CHECK(data);
test::Echo got;
got.fromBinary(data, size);
BOOST_CHECK_EQUAL(got, reply);
++handled;
};
client1.request(echo, func);
client2.request(echo, func);
BOOST_CHECK_EQUAL(handled, 0);
BOOST_CHECK(client1.receive(TIMEOUT));
if (handled < 2)
BOOST_CHECK(client1.receive(TIMEOUT));
BOOST_CHECK_EQUAL(handled, 2);
BOOST_CHECK(!client1.receive(TIMEOUT / 10));
BOOST_CHECK_EQUAL(handled, 2);
thread.join();
BOOST_CHECK(serverHandled);
}
BOOST_AUTO_TEST_CASE(exceptions)
{
BOOST_CHECK_THROW(zeroeq::Server(""), std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Server(zeroeq::URI("141.1.1.1")),
std::runtime_error);
}

Definition at line 34 of file client.h.

Constructor & Destructor Documentation

zeroeq::Client::Client ( )

Create a default client.

Postconditions:

  • connects to all servers set in the comma-separated environment variable ZEROEQ_SERVERS
  • discovers servers on _zeroeq_rep._tcp ZeroConf service
  • filters session <username> or ZEROEQ_SERVER_SESSION from environment
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Client::Client ( const std::string &  session)
explicit

Create a client which connects to servers from the given session.

Postconditions:

  • connects to all servers set in the comma-separated environment variable ZEROEQ_SERVERS
  • discovers publishers on _zeroeq_rep._tcp ZeroConf service
  • filters for given session
Parameters
sessionsession name used for filtering of discovered publishers
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Client::Client ( const URIs uris)
explicit

Create a client which subscribes to specific servers.

Postconditions:

  • connected to the servers on the given URIs once servers are running
Parameters
urispublisher URIs in the format *|host|IP|IF:port
Exceptions
std::runtime_errorif URI is not fully qualified
zeroeq::Client::Client ( Receiver shared)
explicit

Create a default shared client.

See also
Client()
Parameters
sharedanother receiver to share data reception with
zeroeq::Client::Client ( const std::string &  session,
Receiver shared 
)

Create a shared client which subscribes to servers from the given session.

See also
Client( const std::string& )
Parameters
sessiononly subscribe to servers of the same session
sharedanother receiver to share data reception with
zeroeq::Client::Client ( const URIs uris,
Receiver shared 
)

Create a shared client which subscribes to servers on the given URIs.

See also
Client( const URIs& )
Parameters
urisserver URIs in the format *|host|IP|IF:port
sharedanother receiver to share data reception with
zeroeq::Client::~Client ( )

Destroy this client.

Member Function Documentation

const std::string& zeroeq::Client::getSession ( ) const
Returns
the session name that is used for filtering.
bool zeroeq::Client::request ( const servus::Serializable &  request,
const ReplyFunc func 
)

Request the execution of the given data on a connected Server.

The reply function will be executed during receive(). May block when all servers are overloaded or no server is connected.

The reply function will get called with (0, nullptr, 0) if the server does not have a handler for the request or if the handler had an exception.

Parameters
requestthe request identifier and payload
functhe function to execute for the reply
Returns
true if the request was sent, false on error
bool zeroeq::Client::request ( const uint128_t &  request,
const void *  data,
size_t  size,
const ReplyFunc func 
)

Request the execution of the given data on a connected Server.

See request() overload above for details.

Parameters
requestthe request identifier
datathe payload data of the request, may be nullptr
sizethe size of the payload data, may be 0
functhe function to execute for the reply

The documentation for this class was generated from the following file: