ZeroEQ  0.6.0
ZeroEQ - Zero Event Queue
zeroeq::connection::Service Class Reference

Subscribes a Publisher to a remote receiver using a connection::Broker. More...

#include <service.h>

+ Collaboration diagram for zeroeq::connection::Service:

Static Public Member Functions

static bool subscribe (const std::string &address, const Publisher &publisher)
 Request subscription of the given publisher to a remote broker. More...
 
static bool subscribe (const std::string &hostname, const std::string &name, const Publisher &publisher)
 Request subscription of the given publisher to a named remote broker. More...
 

Detailed Description

Subscribes a Publisher to a remote receiver using a connection::Broker.

Example:

/* Copyright (c) 2014-2016, Human Brain Project
* Stefan.Eilemann@epfl.ch
* Juan Hernando <jhernando@fi.upm.es>
*/
#define BOOST_TEST_MODULE zeroeq_connection_broker
#include "../broker.h"
#include <zeroeq/connection/broker.h>
#include <servus/servus.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
typedef std::unique_ptr< zeroeq::connection::Broker > BrokerPtr;
std::string _broker;
zeroeq::Publisher* _publisher = 0;
class Subscriber
{
public:
Subscriber()
: received( false )
, _state( STATE_CREATED )
{}
virtual ~Subscriber() {}
void run()
{
zeroeq::Subscriber subscriber( test::buildURI( "127.0.0.1", *_publisher ));
BOOST_CHECK( subscriber.subscribe( test::Echo::IDENTIFIER(),
zeroeq::EventPayloadFunc([&]( const void* data, const size_t size )
{ test::onEchoEvent( data, size ); received = true; }) ));
// Using the connection broker in place of zeroconf
BrokerPtr broker = createBroker( subscriber );
BOOST_REQUIRE( broker.get( ));
if( !broker )
return;
_broker = broker->getAddress();
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_STARTED;
_condition.notify_all();
}
// test receive of data for echo event
for( size_t i = 0; i < 100 && !received ; ++i )
subscriber.receive( 100 );
}
void waitStarted() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_STARTED )
_condition.wait( lock );
}
void setRun()
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_RUN;
_condition.notify_all();
}
void waitRun() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_RUN )
_condition.wait( lock );
}
bool received;
protected:
mutable std::condition_variable _condition;
mutable std::mutex _mutex;
enum State
{
STATE_CREATED,
STATE_STARTED,
STATE_RUN
} _state;
virtual BrokerPtr createBroker( zeroeq::Subscriber& subscriber )
{
return BrokerPtr(
new zeroeq::connection::Broker( "127.0.0.1:0", subscriber ));
}
};
BOOST_AUTO_TEST_CASE( broker )
{
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
_broker.clear();
Subscriber subscriber;
std::thread thread( std::bind( &Subscriber::run, &subscriber ));
subscriber.waitStarted();
BOOST_CHECK( zeroeq::connection::Service::subscribe( _broker, publisher ));
for( size_t i = 0; i < 100 && !subscriber.received; ++i )
{
BOOST_CHECK( publisher.publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread.join();
BOOST_CHECK( subscriber.received );
_publisher = 0;
}
template< zeroeq::connection::Broker::PortSelection mode >
class NamedSubscriber : public Subscriber
{
BrokerPtr createBroker( zeroeq::Subscriber& subscriber ) override
{
// Multiple instances of the test may run concurrently. Try until we get
// the well-defined port
size_t nTries = 10;
while( nTries-- )
{
try
{
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber, mode ));
}
catch( ... ) {}
waitRun();
}
return BrokerPtr();
}
};
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED >
FixedNamedSubscriber;
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED_OR_RANDOM >
RandomNamedSubscriber;
BOOST_AUTO_TEST_CASE( named_broker )
{
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
RandomNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber1.setRun();
subscriber2.setRun();
"127.0.0.1", "zeroeq::connection::test_named_broker",
publisher ));
for( size_t i = 0; i < 100 && !subscriber1.received; ++i )
{
BOOST_CHECK( publisher.publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread2.join();
thread1.join();
BOOST_CHECK( subscriber1.received );
_publisher = 0;
}
class FailingNamedSubscriber : public Subscriber
{
BrokerPtr createBroker( zeroeq::Subscriber& subscriber ) override
{
BOOST_CHECK_THROW(
new zeroeq::connection::Broker( "zeroeq::connection::test_named_broker",
subscriber,
std::runtime_error );
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber,
}
};
BOOST_AUTO_TEST_CASE( named_broker_port_used )
{
if( getenv( "TRAVIS" ))
return;
zeroeq::Publisher publisher( zeroeq::NULL_SESSION );
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
FailingNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber2.waitStarted();
subscriber1.received = true;
subscriber1.setRun();
thread2.join();
thread1.join();
_publisher = 0;
}
BOOST_AUTO_TEST_CASE( invalid_broker )
{
zeroeq::Subscriber subscriber( zeroeq::URI( "127.0.0.1:1234" ));
BOOST_CHECK_THROW( zeroeq::connection::Broker( std::string( "invalidIP" ),
subscriber ),
std::runtime_error );
}

Definition at line 23 of file service.h.

Member Function Documentation

static bool zeroeq::connection::Service::subscribe ( const std::string &  address,
const Publisher publisher 
)
static

Request subscription of the given publisher to a remote broker.

Upon success, the Broker will add the publisher's address to its managed Subscriber.

Parameters
addressthe broker address (hostname:port), without the protocol.
publisherthe publisher to subscribe to.
Returns
true if the subscription was successful, false on error.
static bool zeroeq::connection::Service::subscribe ( const std::string &  hostname,
const std::string &  name,
const Publisher publisher 
)
static

Request subscription of the given publisher to a named remote broker.

Upon success, the Broker will add the publisher's address to its managed Subscriber. The broker port is derived using the same hashing algorithm as in the corresponding Broker constructor.

Parameters
hostnamethe broker address, without the protocol and port.
namethe application namespace.
publisherthe publisher to subscribe to.
Returns
true if the subscription was successful, false on error.

The documentation for this class was generated from the following file: