#define BOOST_TEST_MODULE zeroeq_connection_broker
#include "../broker.h"
#include <zeroeq/connection/broker.h>
#include <servus/servus.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
typedef std::unique_ptr< zeroeq::connection::Broker > BrokerPtr;
std::string _broker;
class Subscriber
{
public:
Subscriber()
: received( false )
, _state( STATE_CREATED )
{}
virtual ~Subscriber() {}
void run()
{
BOOST_CHECK( subscriber.
subscribe( test::Echo::IDENTIFIER(),
{ test::onEchoEvent( data, size ); received = true; }) ));
BrokerPtr broker = createBroker( subscriber );
BOOST_REQUIRE( broker.get( ));
if( !broker )
return;
_broker = broker->getAddress();
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_STARTED;
_condition.notify_all();
}
for( size_t i = 0; i < 100 && !received ; ++i )
}
void waitStarted() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_STARTED )
_condition.wait( lock );
}
void setRun()
{
std::unique_lock< std::mutex > lock( _mutex );
_state = STATE_RUN;
_condition.notify_all();
}
void waitRun() const
{
std::unique_lock< std::mutex > lock( _mutex );
while( _state < STATE_RUN )
_condition.wait( lock );
}
bool received;
protected:
mutable std::condition_variable _condition;
mutable std::mutex _mutex;
enum State
{
STATE_CREATED,
STATE_STARTED,
STATE_RUN
} _state;
{
return BrokerPtr(
}
};
BOOST_AUTO_TEST_CASE( broker )
{
_publisher = &publisher;
_broker.clear();
Subscriber subscriber;
std::thread thread( std::bind( &Subscriber::run, &subscriber ));
subscriber.waitStarted();
for( size_t i = 0; i < 100 && !subscriber.received; ++i )
{
BOOST_CHECK( publisher.
publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread.join();
BOOST_CHECK( subscriber.received );
_publisher = 0;
}
template< zeroeq::connection::Broker::PortSelection mode >
class NamedSubscriber : public Subscriber
{
{
size_t nTries = 10;
while( nTries-- )
{
try
{
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber, mode ));
}
catch( ... ) {}
waitRun();
}
return BrokerPtr();
}
};
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED >
FixedNamedSubscriber;
typedef NamedSubscriber< zeroeq::connection::Broker::PORT_FIXED_OR_RANDOM >
RandomNamedSubscriber;
BOOST_AUTO_TEST_CASE( named_broker )
{
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
RandomNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber1.setRun();
subscriber2.setRun();
"127.0.0.1", "zeroeq::connection::test_named_broker",
publisher ));
for( size_t i = 0; i < 100 && !subscriber1.received; ++i )
{
BOOST_CHECK( publisher.
publish( test::Echo( test::echoMessage )));
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}
thread2.join();
thread1.join();
BOOST_CHECK( subscriber1.received );
_publisher = 0;
}
class FailingNamedSubscriber : public Subscriber
{
{
BOOST_CHECK_THROW(
subscriber,
std::runtime_error );
return BrokerPtr(
"zeroeq::connection::test_named_broker", subscriber,
}
};
BOOST_AUTO_TEST_CASE( named_broker_port_used )
{
if( getenv( "TRAVIS" ))
return;
_publisher = &publisher;
FixedNamedSubscriber subscriber1;
std::thread thread1( std::bind( &Subscriber::run, &subscriber1 ));
subscriber1.waitStarted();
FailingNamedSubscriber subscriber2;
subscriber2.received = true;
std::thread thread2( std::bind( &Subscriber::run, &subscriber2 ));
subscriber2.waitStarted();
subscriber1.received = true;
subscriber1.setRun();
thread2.join();
thread1.join();
_publisher = 0;
}
BOOST_AUTO_TEST_CASE( invalid_broker )
{
subscriber ),
std::runtime_error );
}