Connection broker to introduce remote publishers to a subscriber.
#define BOOST_TEST_MODULE zeroeq_connection_broker
#include "../common.h"
#include <zeroeq/connection/broker.h>
#include <servus/servus.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
typedef std::unique_ptr<zeroeq::connection::Broker> BrokerPtr;
std::string _broker;
class Subscriber
{
public:
Subscriber()
: received(false)
, _state(STATE_CREATED)
{
}
virtual ~Subscriber() {}
void run()
{
test::Echo::IDENTIFIER(),
test::onEchoEvent(data, size);
received = true;
})));
BrokerPtr broker = createBroker(subscriber);
BOOST_REQUIRE(broker.get());
if (!broker)
return;
_broker = broker->getAddress();
{
std::unique_lock<std::mutex> lock(_mutex);
_state = STATE_STARTED;
_condition.notify_all();
}
for (size_t i = 0; i < 10; ++i)
{
if (received)
return;
}
BOOST_CHECK(!"reachable");
}
void waitStarted() const
{
std::unique_lock<std::mutex> lock(_mutex);
while (_state < STATE_STARTED)
_condition.wait(lock);
}
void setRun()
{
std::unique_lock<std::mutex> lock(_mutex);
_state = STATE_RUN;
_condition.notify_all();
}
void waitRun() const
{
std::unique_lock<std::mutex> lock(_mutex);
while (_state < STATE_RUN)
_condition.wait(lock);
}
bool received;
protected:
mutable std::condition_variable _condition;
mutable std::mutex _mutex;
enum State
{
STATE_CREATED,
STATE_STARTED,
STATE_RUN
} _state;
{
return BrokerPtr(
}
};
BOOST_AUTO_TEST_CASE(broker)
{
_publisher = &publisher;
_broker.clear();
Subscriber subscriber;
std::thread thread(std::bind(&Subscriber::run, &subscriber));
subscriber.waitStarted();
for (size_t i = 0; i < 10 && !subscriber.received; ++i)
{
BOOST_CHECK(publisher.
publish(test::Echo(test::echoMessage)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
thread.join();
BOOST_CHECK(subscriber.received);
_publisher = 0;
}
template <zeroeq::connection::Broker::PortSelection mode>
class NamedSubscriber : public Subscriber
{
{
size_t nTries = 10;
while (nTries--)
{
try
{
"zeroeq::connection::test_named_broker", subscriber, mode));
}
catch (...)
{
}
waitRun();
}
return BrokerPtr();
}
};
typedef NamedSubscriber<zeroeq::connection::Broker::PORT_FIXED>
FixedNamedSubscriber;
typedef NamedSubscriber<zeroeq::connection::Broker::PORT_FIXED_OR_RANDOM>
RandomNamedSubscriber;
BOOST_AUTO_TEST_CASE(named_broker)
{
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1(std::bind(&Subscriber::run, &subscriber1));
subscriber1.waitStarted();
RandomNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2(std::bind(&Subscriber::run, &subscriber2));
subscriber1.setRun();
subscriber2.setRun();
"127.0.0.1", "zeroeq::connection::test_named_broker", publisher));
for (size_t i = 0; i < 10 && !subscriber1.received; ++i)
{
BOOST_CHECK(publisher.
publish(test::Echo(test::echoMessage)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
thread2.join();
thread1.join();
BOOST_CHECK(subscriber1.received);
_publisher = 0;
}
class FailingNamedSubscriber : public Subscriber
{
{
"zeroeq::connection::test_named_broker",
subscriber,
std::runtime_error);
"zeroeq::connection::test_named_broker", subscriber,
}
};
BOOST_AUTO_TEST_CASE(named_broker_port_used)
{
if (getenv("TRAVIS"))
return;
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1(std::bind(&Subscriber::run, &subscriber1));
subscriber1.waitStarted();
FailingNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2(std::bind(&Subscriber::run, &subscriber2));
subscriber2.waitStarted();
subscriber1.received = true;
subscriber1.setRun();
thread2.join();
thread1.join();
_publisher = 0;
}
BOOST_AUTO_TEST_CASE(invalid_broker)
{
subscriber),
std::runtime_error);
}