ZeroEQ  0.9.0
ZeroEQ - Zero Event Queue
zeroeq::Subscriber Class Reference

Subscribes to Publisher to receive events. More...

#include <subscriber.h>

+ Inheritance diagram for zeroeq::Subscriber:
+ Collaboration diagram for zeroeq::Subscriber:

Public Member Functions

 Subscriber ()
 Create a default subscriber. More...
 
 Subscriber (const std::string &session)
 Create a subscriber which subscribes to publisher(s) from the given session. More...
 
 Subscriber (const URIs &uris)
 Create a subscriber which subscribes to specific publishers. More...
 
 Subscriber (Receiver &shared)
 Create a default shared subscriber. More...
 
 Subscriber (const std::string &session, Receiver &shared)
 Create a shared subscriber which subscribes to publisher(s) from the given session. More...
 
 Subscriber (const URIs &uris, Receiver &shared)
 Create a shared subscriber which subscribes to publishers on the given URIs. More...
 
 ~Subscriber ()
 Destroy this subscriber and withdraw any subscriptions. More...
 
 Subscriber (const URI &uri)
 
 Subscriber (const URI &uri, Receiver &shared)
 
bool subscribe (servus::Serializable &serializable)
 Subscribe a serializable object to receive updates from any connected publisher. More...
 
bool subscribe (const uint128_t &event, const EventFunc &func)
 Subscribe to an event from any connected publisher. More...
 
bool subscribe (const uint128_t &event, const EventPayloadFunc &func)
 Subscribe to an event with payload from any connected publisher. More...
 
bool unsubscribe (const servus::Serializable &serializable)
 Unsubscribe a serializable object to stop applying updates from any connected publisher. More...
 
bool unsubscribe (const uint128_t &event)
 
const std::string & getSession () const
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
 Receiver (Receiver &&)
 
Receiveroperator= (Receiver &&)
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Additional Inherited Members

Detailed Description

Subscribes to Publisher to receive events.

If the subscriber is in the same session as discovered publishers, it automatically subscribes to those publishers. Publishers from the same application instance are not considered though.

A subscription to a non-existing publisher is valid. It will start receiving events once the other publisher(s) is(are) publishing.

A receive on any Subscriber of a shared group will work on all subscribers and call the registered handlers.

Not thread safe.

Example:

/* Copyright (c) 2014-2017, Human Brain Project
* Daniel Nachbaur <daniel.nachbaur@epfl.ch>
* Stefan.Eilemann@epfl.ch
*/
#define BOOST_TEST_MODULE zeroeq_subscriber
#include "common.h"
#include <servus/servus.h>
BOOST_AUTO_TEST_CASE(construction)
{
BOOST_CHECK_NO_THROW(zeroeq::Subscriber());
BOOST_CHECK_NO_THROW(zeroeq::Subscriber subscriber("zeroeq_test_none"));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber(zeroeq::URI("localhost:1234")));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber((zeroeq::Receiver&)shared));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber("zeroeq_test_none", shared));
BOOST_CHECK_NO_THROW(
zeroeq::Subscriber(zeroeq::URI("localhost:1234"), shared));
}
BOOST_AUTO_TEST_CASE(invalid_construction)
{
BOOST_CHECK_THROW(zeroeq::Subscriber{zeroeq::NULL_SESSION},
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(""), std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost")),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(
zeroeq::URI("deadbeef://badcoffee:1234")),
std::runtime_error);
BOOST_CHECK_THROW((zeroeq::Subscriber{zeroeq::NULL_SESSION, shared}),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber("", shared), std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"), shared),
std::runtime_error);
}
BOOST_AUTO_TEST_CASE(subscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Empty"),
zeroeq::EventFunc([]() {})));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
}
BOOST_AUTO_TEST_CASE(unsubscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.unsubscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Empty"),
zeroeq::EventFunc([]() {})));
BOOST_CHECK(subscriber.unsubscribe(zeroeq::make_uint128("Empty")));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
BOOST_CHECK(subscriber.unsubscribe(zeroeq::make_uint128("Echo")));
}
BOOST_AUTO_TEST_CASE(invalid_subscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(!subscriber.subscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
BOOST_CHECK(!subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
}
BOOST_AUTO_TEST_CASE(test_invalid_unsubscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(!subscriber.unsubscribe(echo));
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.unsubscribe(echo));
BOOST_CHECK(!subscriber.unsubscribe(echo));
}
BOOST_AUTO_TEST_CASE(test_invalid_unsubscribe_different_event_objects)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
test::Empty empty;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(!subscriber.unsubscribe(empty));
}
BOOST_AUTO_TEST_CASE(not_implemented_servus)
{
if (servus::Servus::isAvailable())
return;
BOOST_CHECK_THROW(zeroeq::Subscriber subscriber("zeroeq_test_none"),
std::runtime_error);
}

Definition at line 34 of file subscriber.h.

Constructor & Destructor Documentation

zeroeq::Subscriber::Subscriber ( )

Create a default subscriber.

Postconditions:

  • discovers publishers on _zeroeq_pub._tcp ZeroConf service
  • filters session <username> or ZEROEQ_PUB_SESSION from environment
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Subscriber::Subscriber ( const std::string &  session)
explicit

Create a subscriber which subscribes to publisher(s) from the given session.

Postconditions:

  • discovers publishers on _zeroeq_pub._tcp ZeroConf service
  • filters for given session
Parameters
sessionsession name used for filtering of discovered publishers
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Subscriber::Subscriber ( const URIs uris)
explicit

Create a subscriber which subscribes to specific publishers.

Postconditions:

  • connected to the publishers on the given URIs once publishers are running on the URIs
Parameters
urispublisher URIs in the format [scheme://]*|host|IP|IF:port
Exceptions
std::runtime_errorif an URI is not fully qualified
zeroeq::Subscriber::Subscriber ( Receiver shared)
explicit

Create a default shared subscriber.

See also
Subscriber()
Parameters
sharedanother receiver to share data reception with
zeroeq::Subscriber::Subscriber ( const std::string &  session,
Receiver shared 
)

Create a shared subscriber which subscribes to publisher(s) from the given session.

See also
Subscriber( const std::string& )
Parameters
sessiononly subscribe to publishers of the same session
sharedanother receiver to share data reception with
zeroeq::Subscriber::Subscriber ( const URIs uris,
Receiver shared 
)

Create a shared subscriber which subscribes to publishers on the given URIs.

See also
Subscriber( const URIs& )
Parameters
urispublisher URIs in the format [scheme://]*|host|IP|IF:port
sharedanother receiver to share data reception with
zeroeq::Subscriber::~Subscriber ( )

Destroy this subscriber and withdraw any subscriptions.

zeroeq::Subscriber::Subscriber ( const URI uri)
inlineexplicit
Parameters
uri
Deprecated:

Definition at line 106 of file subscriber.h.

zeroeq::Subscriber::Subscriber ( const URI uri,
Receiver shared 
)
inline
Parameters
shared
Deprecated:

Definition at line 110 of file subscriber.h.

References getSession(), subscribe(), and unsubscribe().

+ Here is the call graph for this function:

Member Function Documentation

const std::string& zeroeq::Subscriber::getSession ( ) const
Returns
the session name that is used for filtering.

Referenced by Subscriber().

+ Here is the caller graph for this function:

bool zeroeq::Subscriber::subscribe ( servus::Serializable &  serializable)

Subscribe a serializable object to receive updates from any connected publisher.

Every update will be directly applied on the object during receive(). To track updates on the object, the serializable's updated function is called accordingly.

The subscribed object instance has to be valid until unsubscribe().

Parameters
serializablethe object to update on receive()
Returns
true if subscription was successful, false otherwise

Referenced by Subscriber().

+ Here is the caller graph for this function:

bool zeroeq::Subscriber::subscribe ( const uint128_t &  event,
const EventFunc func 
)

Subscribe to an event from any connected publisher.

Every receival of the event will call the registered callback function.

Parameters
eventthe event identifier to subscribe to
functhe callback function called upon receival
Returns
true if subscription was successful, false otherwise
bool zeroeq::Subscriber::subscribe ( const uint128_t &  event,
const EventPayloadFunc func 
)

Subscribe to an event with payload from any connected publisher.

Every receival of the event will call the registered callback function.

Parameters
eventthe event identifier to subscribe to
functhe callback function called upon receival
Returns
true if subscription was successful, false otherwise
bool zeroeq::Subscriber::unsubscribe ( const servus::Serializable &  serializable)

Unsubscribe a serializable object to stop applying updates from any connected publisher.

Parameters
serializablethe object to stop updating on receive()
Returns
true if removal of subscription was successful, false otherwise

Referenced by Subscriber().

+ Here is the caller graph for this function:


The documentation for this class was generated from the following file: