ZeroEQ  0.8.0
ZeroEQ - Zero Event Queue
zeroeq::connection::Broker Class Reference

Brokers subscription requests for a zeroeq::Receiver. More...

#include <broker.h>

+ Inheritance diagram for zeroeq::connection::Broker:
+ Collaboration diagram for zeroeq::connection::Broker:

Public Types

enum  PortSelection { PORT_FIXED, PORT_FIXED_OR_RANDOM }
 

Public Member Functions

 Broker (const std::string &name, Receiver &receiver, const PortSelection mode)
 Convenience constructor to create a new subscription broker. More...
 
 Broker (const std::string &address, Receiver &receiver)
 Create a new subscription broker. More...
 
 ~Broker ()
 Destroy this broker. More...
 
std::string getAddress () const
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
virtual ~Receiver ()
 Destroy this receiver. More...
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Additional Inherited Members

- Protected Member Functions inherited from zeroeq::Receiver
virtual void addSockets (std::vector< detail::Socket > &entries)=0
 Add this receiver's sockets to the given list.
 
virtual void process (detail::Socket &socket, uint32_t timeout)=0
 Process data on a signalled socket. More...
 
virtual void update ()
 Update the internal connection list. More...
 
virtual void addConnection (const std::string &uri)
 Add the given connection to the list of receiving sockets. More...
 
void * getZMQContext ()
 

Detailed Description

Brokers subscription requests for a zeroeq::Receiver.

Example:

/* Copyright (c) 2014-2016, Human Brain Project
* Stefan.Eilemann@epfl.ch
* Juan Hernando <jhernando@fi.upm.es>
*/
#define BOOST_TEST_MODULE zeroeq_connection_broker
#include "../broker.h"
#include <zeroeq/connection/broker.h>
#include <servus/servus.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
typedef std::unique_ptr<zeroeq::connection::Broker> BrokerPtr;
std::string _broker;
zeroeq::Publisher* _publisher = 0;
class Subscriber
{
public:
Subscriber()
: received(false)
, _state(STATE_CREATED)
{
}
virtual ~Subscriber() {}
void run()
{
zeroeq::Subscriber subscriber(test::buildURI("127.0.0.1", *_publisher));
BOOST_CHECK(subscriber.subscribe(
test::Echo::IDENTIFIER(),
zeroeq::EventPayloadFunc([&](const void* data, const size_t size) {
test::onEchoEvent(data, size);
received = true;
})));
// Using the connection broker in place of zeroconf
BrokerPtr broker = createBroker(subscriber);
BOOST_REQUIRE(broker.get());
if (!broker)
return;
_broker = broker->getAddress();
{
std::unique_lock<std::mutex> lock(_mutex);
_state = STATE_STARTED;
_condition.notify_all();
}
// test receive of data for echo event
for (size_t i = 0; i < 100 && !received; ++i)
subscriber.receive(100);
}
void waitStarted() const
{
std::unique_lock<std::mutex> lock(_mutex);
while (_state < STATE_STARTED)
_condition.wait(lock);
}
void setRun()
{
std::unique_lock<std::mutex> lock(_mutex);
_state = STATE_RUN;
_condition.notify_all();
}
void waitRun() const
{
std::unique_lock<std::mutex> lock(_mutex);
while (_state < STATE_RUN)
_condition.wait(lock);
}
bool received;
protected:
mutable std::condition_variable _condition;
mutable std::mutex _mutex;
enum State
{
STATE_CREATED,
STATE_STARTED,
STATE_RUN
} _state;
virtual BrokerPtr createBroker(zeroeq::Subscriber& subscriber)
{
return BrokerPtr(
new zeroeq::connection::Broker("127.0.0.1:0", subscriber));
}
};
BOOST_AUTO_TEST_CASE(broker)
{
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
_publisher = &publisher;
_broker.clear();
Subscriber subscriber;
std::thread thread(std::bind(&Subscriber::run, &subscriber));
subscriber.waitStarted();
BOOST_CHECK(zeroeq::connection::Service::subscribe(_broker, publisher));
for (size_t i = 0; i < 100 && !subscriber.received; ++i)
{
BOOST_CHECK(publisher.publish(test::Echo(test::echoMessage)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
thread.join();
BOOST_CHECK(subscriber.received);
_publisher = 0;
}
template <zeroeq::connection::Broker::PortSelection mode>
class NamedSubscriber : public Subscriber
{
BrokerPtr createBroker(zeroeq::Subscriber& subscriber) override
{
// Multiple instances of the test may run concurrently. Try until we get
// the well-defined port
size_t nTries = 10;
while (nTries--)
{
try
{
return BrokerPtr(new zeroeq::connection::Broker(
"zeroeq::connection::test_named_broker", subscriber, mode));
}
catch (...)
{
}
waitRun();
}
return BrokerPtr();
}
};
typedef NamedSubscriber<zeroeq::connection::Broker::PORT_FIXED>
FixedNamedSubscriber;
typedef NamedSubscriber<zeroeq::connection::Broker::PORT_FIXED_OR_RANDOM>
RandomNamedSubscriber;
BOOST_AUTO_TEST_CASE(named_broker)
{
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1(std::bind(&Subscriber::run, &subscriber1));
subscriber1.waitStarted();
RandomNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2(std::bind(&Subscriber::run, &subscriber2));
subscriber1.setRun();
subscriber2.setRun();
"127.0.0.1", "zeroeq::connection::test_named_broker", publisher));
for (size_t i = 0; i < 100 && !subscriber1.received; ++i)
{
BOOST_CHECK(publisher.publish(test::Echo(test::echoMessage)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
thread2.join();
thread1.join();
BOOST_CHECK(subscriber1.received);
_publisher = 0;
}
class FailingNamedSubscriber : public Subscriber
{
BrokerPtr createBroker(zeroeq::Subscriber& subscriber) override
{
BOOST_CHECK_THROW(new zeroeq::connection::Broker(
"zeroeq::connection::test_named_broker",
subscriber,
std::runtime_error);
return BrokerPtr(new zeroeq::connection::Broker(
"zeroeq::connection::test_named_broker", subscriber,
}
};
BOOST_AUTO_TEST_CASE(named_broker_port_used)
{
if (getenv("TRAVIS"))
return;
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1(std::bind(&Subscriber::run, &subscriber1));
subscriber1.waitStarted();
FailingNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2(std::bind(&Subscriber::run, &subscriber2));
subscriber2.waitStarted();
subscriber1.received = true;
subscriber1.setRun();
thread2.join();
thread1.join();
_publisher = 0;
}
BOOST_AUTO_TEST_CASE(invalid_broker)
{
zeroeq::Subscriber subscriber(zeroeq::URI("127.0.0.1:1234"));
BOOST_CHECK_THROW(zeroeq::connection::Broker(std::string("invalidIP"),
subscriber),
std::runtime_error);
}

Definition at line 38 of file broker.h.

Member Enumeration Documentation

Enumerator
PORT_FIXED 

Use only the fixed port for named brokers.

PORT_FIXED_OR_RANDOM 

Fall back to a random port on failure.

Definition at line 41 of file broker.h.

Constructor & Destructor Documentation

zeroeq::connection::Broker::Broker ( const std::string &  name,
Receiver receiver,
const PortSelection  mode 
)

Convenience constructor to create a new subscription broker.

This constructor will try to bind to INADDR_ANY and a fixed port derived from the given name. If that fails, it will throw a std::runtime_error (if mode is PORT_FIXED) or allocate a random port (if mode is PORT_FIXED_OR_RANDOM).

The name should be a string of the application's namespace, e.g., "livre". The same string should be used by Service::subscribe(). A hashing algorithm is used to derive the port from the name. This yields a fixed port number, therefore only a single application per machine can run one broker. If the random port fallback is enabled, this constructor will allocate a random available port.

Parameters
namethe application namespace.
receiverthe Receiver to manage.
modethe allocation strategy if the fixed port is unavailable.
Exceptions
std::runtime_errorwhen the zmq setup failed.
zeroeq::connection::Broker::Broker ( const std::string &  address,
Receiver receiver 
)

Create a new subscription broker.

The given receiver has to have at least the same lifetime as this broker. The receiver and broker are automatically shared.

For simplicity, only a single Receiver is handled by a Broker. The implementation should be extended if multiple receivers shall be handled.

Parameters
addressthe zmq reply socket address to be used.
receiverthe Receiver to manage.
Exceptions
std::runtime_errorwhen the zmq setup failed.
zeroeq::connection::Broker::~Broker ( )

Destroy this broker.


The documentation for this class was generated from the following file: