ZeroEQ  0.8.0
ZeroEQ - Zero Event Queue
zeroeq::Subscriber Class Reference

Subscribes to Publisher to receive events. More...

#include <subscriber.h>

+ Inheritance diagram for zeroeq::Subscriber:
+ Collaboration diagram for zeroeq::Subscriber:

Public Member Functions

 Subscriber ()
 Create a default subscriber. More...
 
 Subscriber (const std::string &session)
 Create a subscriber which subscribes to publisher(s) from the given session. More...
 
 Subscriber (const URI &uri)
 Create a subscriber which subscribes to a specific publisher. More...
 
 Subscriber (const URI &uri, const std::string &session)
 Create a subscriber which subscribes to publisher(s) on the given URI. More...
 
 Subscriber (Receiver &shared)
 Create a default shared subscriber. More...
 
 Subscriber (const std::string &session, Receiver &shared)
 Create a shared subscriber which subscribes to publisher(s) from the given session. More...
 
 Subscriber (const URI &uri, Receiver &shared)
 Create a shared subscriber which subscribes to publisher(s) on the given URI. More...
 
 Subscriber (const URI &uri, const std::string &session, Receiver &shared)
 Create a subscriber which subscribes to publisher(s) on the given URI. More...
 
 ~Subscriber ()
 Destroy this subscriber and withdraw any subscriptions. More...
 
bool subscribe (servus::Serializable &serializable)
 Subscribe a serializable object to receive updates from any connected publisher. More...
 
bool subscribe (const uint128_t &event, const EventFunc &func)
 Subscribe to an event from any connected publisher. More...
 
bool subscribe (const uint128_t &event, const EventPayloadFunc &func)
 Subscribe to an event with payload from any connected publisher. More...
 
bool unsubscribe (const servus::Serializable &serializable)
 Unsubscribe a serializable object to stop applying updates from any connected publisher. More...
 
bool unsubscribe (const uint128_t &event)
 
const std::string & getSession () const
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
virtual ~Receiver ()
 Destroy this receiver. More...
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Additional Inherited Members

- Protected Member Functions inherited from zeroeq::Receiver
void * getZMQContext ()
 

Detailed Description

Subscribes to Publisher to receive events.

If the subscriber is in the same session as discovered publishers, it automatically subscribes to those publishers. Publishers from the same application instance are not considered though.

A subscription to a non-existing publisher is valid. It will start receiving events once the other publisher(s) is(are) publishing.

A receive on any Subscriber of a shared group will work on all subscribers and call the registered handlers.

Not thread safe.

Example:

/* Copyright (c) 2014, Human Brain Project
* Daniel Nachbaur <daniel.nachbaur@epfl.ch>
* Stefan.Eilemann@epfl.ch
*/
#define BOOST_TEST_MODULE zeroeq_subscriber
#include "broker.h"
#include <servus/servus.h>
BOOST_AUTO_TEST_CASE(construction)
{
BOOST_CHECK_NO_THROW(zeroeq::Subscriber());
BOOST_CHECK_NO_THROW(
zeroeq::Subscriber subscriber(test::buildUniqueSession()));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber(zeroeq::URI("localhost:1234")));
BOOST_CHECK_NO_THROW(
zeroeq::Subscriber(zeroeq::URI("localhost"), zeroeq::DEFAULT_SESSION));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber((zeroeq::Receiver&)shared));
BOOST_CHECK_NO_THROW(
zeroeq::Subscriber(test::buildUniqueSession(), shared));
BOOST_CHECK_NO_THROW(
zeroeq::Subscriber(zeroeq::URI("localhost:1234"), shared));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"),
zeroeq::DEFAULT_SESSION, shared));
BOOST_CHECK_NO_THROW(zeroeq::Subscriber(zeroeq::URI("localhost:1234"),
zeroeq::DEFAULT_SESSION, shared));
}
BOOST_AUTO_TEST_CASE(invalid_construction)
{
BOOST_CHECK_THROW(zeroeq::Subscriber subscriber(zeroeq::NULL_SESSION),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(""), std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost")),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"),
zeroeq::NULL_SESSION),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"), ""),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber subscriber(zeroeq::NULL_SESSION,
shared),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber("", shared), std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"), shared),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"),
zeroeq::NULL_SESSION, shared),
std::runtime_error);
BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"), "", shared),
std::runtime_error);
}
BOOST_AUTO_TEST_CASE(subscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Empty"),
zeroeq::EventFunc([]() {})));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
}
BOOST_AUTO_TEST_CASE(unsubscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.unsubscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Empty"),
zeroeq::EventFunc([]() {})));
BOOST_CHECK(subscriber.unsubscribe(zeroeq::make_uint128("Empty")));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
BOOST_CHECK(subscriber.unsubscribe(zeroeq::make_uint128("Echo")));
}
BOOST_AUTO_TEST_CASE(invalid_subscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(!subscriber.subscribe(echo));
BOOST_CHECK(subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
BOOST_CHECK(!subscriber.subscribe(zeroeq::make_uint128("Echo"),
[](const void*, size_t) {})));
}
BOOST_AUTO_TEST_CASE(test_invalid_unsubscribe)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
BOOST_CHECK(!subscriber.unsubscribe(echo));
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(subscriber.unsubscribe(echo));
BOOST_CHECK(!subscriber.unsubscribe(echo));
}
BOOST_AUTO_TEST_CASE(test_invalid_unsubscribe_different_event_objects)
{
zeroeq::Subscriber subscriber;
test::Echo echo;
test::Empty empty;
BOOST_CHECK(subscriber.subscribe(echo));
BOOST_CHECK(!subscriber.unsubscribe(empty));
}
BOOST_AUTO_TEST_CASE(not_implemented_servus)
{
if (servus::Servus::isAvailable())
return;
const zeroeq::URI uri(test::buildUniqueSession());
BOOST_CHECK_THROW(zeroeq::Subscriber subscriber(uri), std::runtime_error);
}

Definition at line 33 of file subscriber.h.

Constructor & Destructor Documentation

zeroeq::Subscriber::Subscriber ( )

Create a default subscriber.

Postconditions:

  • discovers publishers on _zeroeq_pub._tcp ZeroConf service
  • filters session <username> or ZEROEQ_SESSION from environment
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Subscriber::Subscriber ( const std::string &  session)
explicit

Create a subscriber which subscribes to publisher(s) from the given session.

Postconditions:

  • discovers publishers on _zeroeq_pub._tcp ZeroConf service
  • filters for given session
Parameters
sessionsession name used for filtering of discovered publishers
Exceptions
std::runtime_errorif ZeroConf is not available
zeroeq::Subscriber::Subscriber ( const URI uri)
explicit

Create a subscriber which subscribes to a specific publisher.

Postconditions:

  • connected to the publisher on the given URI once publisher is running on the URI
Parameters
uripublisher URI in the format [scheme://]*|host|IP|IF:port
Exceptions
std::runtime_errorif URI is not fully qualified
zeroeq::Subscriber::Subscriber ( const URI uri,
const std::string &  session 
)

Create a subscriber which subscribes to publisher(s) on the given URI.

The discovery and filtering by session is only used if the URI is not fully qualified.

Postconditions:

  • discovers publishers on _zeroeq_pub._tcp ZeroConf service if URI is not fully qualified
  • filters session <username> or ZEROEQ_SESSION from environment if zeroeq::DEFAULT_SESSION
Parameters
uripublisher URI in the format [scheme://][*|host|IP|IF][:port]
sessionsession name used for filtering of discovered publishers
Exceptions
std::runtime_errorif ZeroConf is not available or if session name is invalid
zeroeq::Subscriber::Subscriber ( Receiver shared)
explicit

Create a default shared subscriber.

See also
Subscriber()
Parameters
sharedanother receiver to share data reception with
zeroeq::Subscriber::Subscriber ( const std::string &  session,
Receiver shared 
)

Create a shared subscriber which subscribes to publisher(s) from the given session.

See also
Subscriber( const std::string& )
Parameters
sessiononly subscribe to publishers of the same session
sharedanother receiver to share data reception with
zeroeq::Subscriber::Subscriber ( const URI uri,
Receiver shared 
)

Create a shared subscriber which subscribes to publisher(s) on the given URI.

See also
Subscriber( const URI& )
Parameters
uripublisher URI in the format [scheme://]*|host|IP|IF:port
sharedanother receiver to share data reception with
zeroeq::Subscriber::Subscriber ( const URI uri,
const std::string &  session,
Receiver shared 
)

Create a subscriber which subscribes to publisher(s) on the given URI.

See also
Subscriber( const URI&, const std::string& )
Parameters
uripublisher URI in the format [scheme://][*|host|IP|IF][:port]
sessionsession name used for filtering of discovered publishers
sharedanother receiver to share data reception with.
zeroeq::Subscriber::~Subscriber ( )

Destroy this subscriber and withdraw any subscriptions.

Member Function Documentation

const std::string& zeroeq::Subscriber::getSession ( ) const
Returns
the session name that is used for filtering.
bool zeroeq::Subscriber::subscribe ( servus::Serializable &  serializable)

Subscribe a serializable object to receive updates from any connected publisher.

Every update will be directly applied on the object during receive(). To track updates on the object, the serializable's updated function is called accordingly.

The subscribed object instance has to be valid until unsubscribe().

Parameters
serializablethe object to update on receive()
Returns
true if subscription was successful, false otherwise
bool zeroeq::Subscriber::subscribe ( const uint128_t &  event,
const EventFunc func 
)

Subscribe to an event from any connected publisher.

Every receival of the event will call the registered callback function.

Parameters
eventthe event identifier to subscribe to
functhe callback function called upon receival
Returns
true if subscription was successful, false otherwise
bool zeroeq::Subscriber::subscribe ( const uint128_t &  event,
const EventPayloadFunc func 
)

Subscribe to an event with payload from any connected publisher.

Every receival of the event will call the registered callback function.

Parameters
eventthe event identifier to subscribe to
functhe callback function called upon receival
Returns
true if subscription was successful, false otherwise
bool zeroeq::Subscriber::unsubscribe ( const servus::Serializable &  serializable)

Unsubscribe a serializable object to stop applying updates from any connected publisher.

Parameters
serializablethe object to stop updating on receive()
Returns
true if removal of subscription was successful, false otherwise

The documentation for this class was generated from the following file: