ZeroEQ  0.6.0
ZeroEQ - Zero Event Queue
zeroeq::connection::Broker Class Reference

Brokers subscription requests for a zeroeq::Receiver. More...

#include <broker.h>

+ Inheritance diagram for zeroeq::connection::Broker:
+ Collaboration diagram for zeroeq::connection::Broker:

Public Types

enum  PortSelection { PORT_FIXED, PORT_FIXED_OR_RANDOM }
 

Public Member Functions

 Broker (const std::string &name, Receiver &receiver, const PortSelection mode)
 Convenience constructor to create a new subscription broker. More...
 
 Broker (const std::string &address, Receiver &receiver)
 Create a new subscription broker. More...
 
 ~Broker ()
 Destroy this broker. More...
 
std::string getAddress () const
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
virtual ~Receiver ()
 Destroy this receiver. More...
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Additional Inherited Members

- Protected Member Functions inherited from zeroeq::Receiver
virtual void addSockets (std::vector< detail::Socket > &entries)=0
 Add this receiver's sockets to the given list.
 
virtual void process (detail::Socket &socket, uint32_t timeout)=0
 Process data on a signalled socket. More...
 
virtual void update ()
 Update the internal connection list. More...
 
void * getZMQContext ()
 

Detailed Description

Brokers subscription requests for a zeroeq::Receiver.

Example:

/* Copyright (c) 2014-2016, Human Brain Project
* Stefan.Eilemann@epfl.ch
* Juan Hernando <jhernando@fi.upm.es>
*/
#define BOOST_TEST_MODULE zeroeq_connection_broker
#include "../broker.h"
#include <zeroeq/connection/broker.h>
#include <servus/servus.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
typedef std::unique_ptr< zeroeq::connection::Broker > BrokerPtr;
std::string _broker;
zeroeq::Publisher* _publisher = 0;
class Subscriber
{
public:
Subscriber()
: received( false )
, _state( STATE_CREATED )
{}
virtual ~Subscriber() {}
void run()
{
zeroeq::Subscriber subscriber( test::buildURI( "127.0.0.1", *_publisher ));
BOOST_CHECK( subscriber.subscribe( test::Echo::IDENTIFIER(),
zeroeq::EventPayloadFunc([&]( const void* data, const size_t size )
{ test::onEchoEvent( data, size ); received = true; }) ));
// Using the connection broker in place of zeroconf
BrokerPtr broker = createBroker( subscriber );
BOOST_REQUIRE( broker.get( ));
if( !broker )
return;
_broker = broker->getAddress();
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_STARTED;
_condition.notify_all();
}
// test receive of data for echo event
for( size_t i = 0; i < 100 && !received ; ++i )
subscriber.receive( 100 );
}
void waitStarted() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_STARTED )
_condition.wait( lock );
}
void setRun()
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_RUN;
_condition.notify_all();
}
void waitRun() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_RUN )
_condition.wait( lock );
}
bool received;
protected:
mutable std::condition_variable _condition;
mutable std::mutex _mutex;
enum State
{
STATE_CREATED,
STATE_STARTED,
STATE_RUN
} _state;
virtual BrokerPtr createBroker( zeroeq::Subscriber& subscriber )
{
return BrokerPtr(
new zeroeq::connection::Broker( "127.0.0.1:0", subscriber ));
}
};
BOOST_AUTO_TEST_CASE( broker )
{
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
_broker.clear();
Subscriber subscriber;
std::thread thread( std::bind( &Subscriber::run, &subscriber ));
subscriber.waitStarted();
BOOST_CHECK( zeroeq::connection::Service::subscribe( _broker, publisher ));
for( size_t i = 0; i < 100 && !subscriber.received; ++i )
{
BOOST_CHECK( publisher.publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread.join();
BOOST_CHECK( subscriber.received );
_publisher = 0;
}
template< zeroeq::connection::Broker::PortSelection mode >
class NamedSubscriber : public Subscriber
{
BrokerPtr createBroker( zeroeq::Subscriber& subscriber ) override
{
// Multiple instances of the test may run concurrently. Try until we get
// the well-defined port
size_t nTries = 10;
while( nTries-- )
{
try
{
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber, mode ));
}
catch( ... ) {}
waitRun();
}
return BrokerPtr();
}
};
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED >
FixedNamedSubscriber;
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED_OR_RANDOM >
RandomNamedSubscriber;
BOOST_AUTO_TEST_CASE( named_broker )
{
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
RandomNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber1.setRun();
subscriber2.setRun();
"127.0.0.1", "zeroeq::connection::test_named_broker",
publisher ));
for( size_t i = 0; i < 100 && !subscriber1.received; ++i )
{
BOOST_CHECK( publisher.publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread2.join();
thread1.join();
BOOST_CHECK( subscriber1.received );
_publisher = 0;
}
class FailingNamedSubscriber : public Subscriber
{
BrokerPtr createBroker( zeroeq::Subscriber& subscriber ) override
{
BOOST_CHECK_THROW(
new zeroeq::connection::Broker( "zeroeq::connection::test_named_broker",
subscriber,
std::runtime_error );
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber,
}
};
BOOST_AUTO_TEST_CASE( named_broker_port_used )
{
if( getenv( "TRAVIS" ))
return;
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
FailingNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber2.waitStarted();
subscriber1.received = true;
subscriber1.setRun();
thread2.join();
thread1.join();
_publisher = 0;
}
BOOST_AUTO_TEST_CASE( invalid_broker )
{
zeroeq::Subscriber subscriber( zeroeq::URI( "127.0.0.1:1234" ));
BOOST_CHECK_THROW( zeroeq::connection::Broker( std::string( "invalidIP" ),
subscriber ),
std::runtime_error );
}

Definition at line 35 of file broker.h.

Member Enumeration Documentation

Enumerator
PORT_FIXED 

Use only the fixed port for named brokers.

PORT_FIXED_OR_RANDOM 

Fall back to a random port on failure.

Definition at line 38 of file broker.h.

Constructor & Destructor Documentation

zeroeq::connection::Broker::Broker ( const std::string &  name,
Receiver receiver,
const PortSelection  mode 
)

Convenience constructor to create a new subscription broker.

This constructor will try to bind to INADDR_ANY and a fixed port derived from the given name. If that fails, it will throw a std::runtime_error (if mode is PORT_FIXED) or allocate a random port (if mode is PORT_FIXED_OR_RANDOM).

The name should be a string of the application's namespace, e.g., "livre". The same string should be used by Service::subscribe(). A hashing algorithm is used to derive the port from the name. This yields a fixed port number, therefore only a single application per machine can run one broker. If the random port fallback is enabled, this constructor will allocate a random available port.

Parameters
namethe application namespace.
receiverthe Receiver to manage.
modethe allocation strategy if the fixed port is unavailable.
Exceptions
std::runtime_errorwhen the zmq setup failed.
zeroeq::connection::Broker::Broker ( const std::string &  address,
Receiver receiver 
)

Create a new subscription broker.

The given receiver has to have at least the same lifetime as this broker. The receiver and broker are automatically shared.

For simplicity, only a single Receiver is handled by a Broker. The implementation should be extended if multiple receivers shall be handled.

Parameters
addressthe zmq reply socket address to be used.
receiverthe Receiver to manage.
Exceptions
std::runtime_errorwhen the zmq setup failed.
zeroeq::connection::Broker::~Broker ( )

Destroy this broker.


The documentation for this class was generated from the following file: