ZeroEQ  0.8.0
ZeroEQ - Zero Event Queue
zeroeq::Receiver Class Referenceabstract

Base class for entities receiving data. More...

#include <receiver.h>

+ Inheritance diagram for zeroeq::Receiver:
+ Collaboration diagram for zeroeq::Receiver:

Public Member Functions

 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
virtual ~Receiver ()
 Destroy this receiver. More...
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Protected Member Functions

virtual void addSockets (std::vector< detail::Socket > &entries)=0
 Add this receiver's sockets to the given list.
 
virtual void process (detail::Socket &socket, uint32_t timeout)=0
 Process data on a signalled socket. More...
 
virtual void update ()
 Update the internal connection list. More...
 
virtual void addConnection (const std::string &uri)
 Add the given connection to the list of receiving sockets. More...
 
void * getZMQContext ()
 

Friends

class detail::Receiver
 
class connection::detail::Broker
 

Detailed Description

Base class for entities receiving data.

Provides a receive() method, which demultiplexes data between multiple inputs of multiple instances of receivers. Receivers form a shared group by linking them at construction time.

Not intended to be as a final class. Not thread safe.

Example:

/* Copyright (c) 2014, Human Brain Project
* Stefan.Eilemann@epfl.ch
*/
#define BOOST_TEST_MODULE zeroeq_receiver
#include "broker.h"
#include <chrono>
bool gotOne = false;
bool gotTwo = false;
void onEvent1()
{
gotOne = true;
}
void onEvent2()
{
gotTwo = true;
}
void testReceive(zeroeq::Publisher& publisher, zeroeq::Receiver& receiver,
bool& var1, bool& var2, const int line)
{
gotOne = false;
gotTwo = false;
const auto startTime = std::chrono::high_resolution_clock::now();
for (;;)
{
BOOST_CHECK(publisher.publish(test::Echo(test::echoMessage)));
while (receiver.receive(100))
{
}
if (var1 && var2)
break;
const auto endTime = std::chrono::high_resolution_clock::now();
const auto elapsed =
std::chrono::nanoseconds(endTime - startTime).count() / 1000000;
if (elapsed > 2000 /*ms*/)
break;
}
BOOST_CHECK_MESSAGE(var1, (&var1 == &gotOne ? "Event 1" : "Event 2")
<< " not received (l." << line << ")");
if (&var1 != &var2)
BOOST_CHECK_MESSAGE(var2, (&var2 == &gotOne ? "Event 1" : "Event 2")
<< " not received (l." << line << ")");
}
void testReceive(zeroeq::Publisher& publisher, zeroeq::Receiver& receiver,
bool& var, const int line)
{
testReceive(publisher, receiver, var, var, line);
}
BOOST_AUTO_TEST_CASE(test_two_subscribers)
{
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber1(test::buildURI("localhost", publisher));
zeroeq::Subscriber subscriber2(test::buildURI("localhost", publisher),
subscriber1);
BOOST_CHECK(subscriber1.subscribe(test::Echo::IDENTIFIER(),
zeroeq::EventFunc(&onEvent1)));
BOOST_CHECK(subscriber2.subscribe(test::Echo::IDENTIFIER(),
zeroeq::EventFunc(&onEvent2)));
testReceive(publisher, subscriber1, gotOne, gotTwo, __LINE__);
testReceive(publisher, subscriber2, gotOne, gotTwo, __LINE__);
}
BOOST_AUTO_TEST_CASE(test_publisher_routing)
{
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Publisher silentPublisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber* subscriber1 =
new zeroeq::Subscriber(test::buildURI("localhost", silentPublisher));
zeroeq::Subscriber subscriber2(test::buildURI("localhost", publisher),
*subscriber1);
BOOST_CHECK(subscriber1->subscribe(test::Echo::IDENTIFIER(),
zeroeq::EventFunc(&onEvent1)));
BOOST_CHECK(subscriber2.subscribe(test::Echo::IDENTIFIER(),
zeroeq::EventFunc(&onEvent2)));
testReceive(publisher, *subscriber1, gotTwo, __LINE__);
BOOST_CHECK(!gotOne);
testReceive(publisher, subscriber2, gotTwo, __LINE__);
BOOST_CHECK(!gotOne);
delete subscriber1;
testReceive(publisher, subscriber2, gotTwo, __LINE__);
BOOST_CHECK(!gotOne);
}

Definition at line 42 of file receiver.h.

Constructor & Destructor Documentation

zeroeq::Receiver::Receiver ( )

Create a new standalone receiver.

zeroeq::Receiver::Receiver ( Receiver shared)
explicit

Create a shared receiver.

All receivers sharing a group may receive data when receive() is called on any of them.

Parameters
sharedanother receiver to form a simultaneous receive group with.
virtual zeroeq::Receiver::~Receiver ( )
virtual

Destroy this receiver.

Member Function Documentation

virtual void zeroeq::Receiver::addConnection ( const std::string &  uri)
protectedvirtual

Add the given connection to the list of receiving sockets.

Parameters
urithe ZeroMQ address to connect to.
virtual void zeroeq::Receiver::process ( detail::Socket &  socket,
uint32_t  timeout 
)
protectedpure virtual

Process data on a signalled socket.

Parameters
socketthe socket provided from addSockets().
timeoutuser provided timeout from receive().
bool zeroeq::Receiver::receive ( const uint32_t  timeout = TIMEOUT_INDEFINITE)

Receive at least one event from all shared receivers.

Using receive( 0 ) is equivalent to polling the receivers for data.

Parameters
timeouttimeout in ms for poll, default blocking poll until at least one event is received
Returns
true if at least one event was received
Exceptions
std::runtime_errorwhen polling failed.
virtual void zeroeq::Receiver::update ( )
inlineprotectedvirtual

Update the internal connection list.

Called on all members of a shared group regularly by receive() to update their list of sockets.

Definition at line 93 of file receiver.h.


The documentation for this class was generated from the following file: