ZeroEQ  0.6.0
ZeroEQ - Zero Event Queue
zeroeq::http::Server Class Reference

Serves HTTP GET and PUT requests for servus::Serializable objects. More...

#include <server.h>

+ Inheritance diagram for zeroeq::http::Server:
+ Collaboration diagram for zeroeq::http::Server:

Public Member Functions

Object registration for PUT and GET requests
bool add (servus::Serializable &object)
 Subscribe and register the given object. More...
 
bool remove (const servus::Serializable &object)
 Unsubscribe and unregister the given object. More...
 
bool subscribe (servus::Serializable &object)
 Subscribe a serializable object to receive updates from HTTP PUT requests. More...
 
bool subscribe (const std::string &event, const PUTFunc &func)
 Subscribe an event to receive HTTP PUT requests. More...
 
bool subscribe (const std::string &event, const PUTPayloadFunc &func)
 Subscribe an event to receive HTTP PUT requests with payload. More...
 
bool unsubscribe (const servus::Serializable &object)
 Unsubscribe the given object to stop applying updates. More...
 
bool unsubscribe (const std::string &event)
 Unsubscribe the given event to stop receiving PUT requests. More...
 
bool register_ (servus::Serializable &object)
 Subscribe a serializable object to serve HTTP GET requests. More...
 
bool register_ (const std::string &event, const GETFunc &func)
 Subscribe an event to serve HTTP GET requests. More...
 
bool unregister (const servus::Serializable &object)
 Unsubscribe the given object for GET requests. More...
 
bool unregister (const std::string &event)
 Unsubscribe the given event name for GET requests. More...
 
- Public Member Functions inherited from zeroeq::Receiver
 Receiver ()
 Create a new standalone receiver. More...
 
 Receiver (Receiver &shared)
 Create a shared receiver. More...
 
virtual ~Receiver ()
 Destroy this receiver. More...
 
bool receive (const uint32_t timeout=TIMEOUT_INDEFINITE)
 Receive at least one event from all shared receivers. More...
 

Setup

 Server (const URI &uri, Receiver &shared)
 Construct a new HTTP server. More...
 
 Server (const URI &uri)
 
 Server (Receiver &shared)
 
 Server ()
 
 Server (Server &shared)
 
virtual ~Server ()
 
const URIgetURI () const
 Get the publisher URI. More...
 
SocketDescriptor getSocketDescriptor () const
 Get the underlying socket descriptor. More...
 
static std::unique_ptr< Serverparse (int argc, const char *const *argv)
 Create a new Server when requested. More...
 
static std::unique_ptr< Serverparse (int argc, const char *const *argv, Receiver &shared)
 

Additional Inherited Members

- Protected Member Functions inherited from zeroeq::Receiver
virtual void update ()
 Update the internal connection list. More...
 
void * getZMQContext ()
 

Detailed Description

Serves HTTP GET and PUT requests for servus::Serializable objects.

Behaves semantically like a Publisher (for GET) and Subscriber (for PUT), except uses HTTP with JSON payload as the protocol. Requests are served synchronously (as per HTTP spec). Objects are available under their Serializable::getTypeName(), with '::' replaced by '/'. The REST API is case insensitive. For example, zerobuf::render::Camera is served at 'GET|PUT [uri]/zerobuf/render/camera'.

Not thread safe.

Example:

/* Copyright (c) 2016, Human Brain Project
* Stefan.Eilemann@epfl.ch
*/
#define BOOST_TEST_MODULE http_server
#include <zeroeq/http/server.h>
#include <zeroeq/subscriber.h>
#include <zeroeq/uri.h>
#include <zmq.h>
#include <servus/serializable.h>
#include <servus/uri.h>
#include <boost/test/unit_test.hpp>
#include <thread>
namespace
{
const std::string jsonGet( "Not JSON, just want to see that the is data a-ok" );
const std::string jsonPut( "See what my stepbrother jsonGet says" );
const std::string cors_headers(
"Access-Control-Allow-Headers: Content-Type\r\n"
"Access-Control-Allow-Methods: GET,PUT,OPTIONS\r\n"
"Access-Control-Allow-Origin: *\r\n");
const std::string error400(
"HTTP/1.0 400 Bad Request\r\n" + cors_headers +
"Content-Length: 28\r\n\r\nHTTP/1.0 400 Bad Request\r\n\r\n" );
const std::string error404(
"HTTP/1.0 404 Not Found\r\n" + cors_headers +
"Content-Length: 26\r\n\r\nHTTP/1.0 404 Not Found\r\n\r\n" );
const std::string error405(
"HTTP/1.0 405 Method Not Allowed\r\n" + cors_headers +
"Content-Length: 35\r\n\r\nHTTP/1.0 405 Method Not Allowed\r\n\r\n" );
const std::string error411(
"HTTP/1.0 411 Length Required\r\n" + cors_headers +
"Content-Length: 32\r\n\r\nHTTP/1.0 411 Length Required\r\n\r\n" );
class Foo : public servus::Serializable
{
public:
Foo() { _notified = false; }
void setNotified() { _notified = true; }
bool getNotified() const { return _notified; }
private:
std::string getTypeName() const final { return "test::Foo"; }
virtual zeroeq::uint128_t getTypeIdentifier() const final
{ return servus::make_uint128( getTypeName( )); }
bool _fromJSON( const std::string& json ) final
{
return jsonPut == json;
}
std::string _toJSON() const final
{
return jsonGet;
}
bool _notified;
};
class Client
{
public:
Client( const zeroeq::URI& uri )
: _ctx( ::zmq_ctx_new ( ))
, _socket( ::zmq_socket( _ctx, ZMQ_STREAM ))
{
if( ::zmq_connect( _socket, std::to_string( uri ).c_str( )) == -1 )
throw std::runtime_error( "Connect failed" );
// Get server identity
BOOST_CHECK_EQUAL( zmq_getsockopt( _socket, ZMQ_IDENTITY, _id,
&_idSize ), 0 );
}
~Client()
{
if( _socket )
::zmq_close( _socket );
if( _ctx )
::zmq_ctx_destroy( _ctx );
}
void sendRequest( const std::string& request )
{
if( ::zmq_send( _socket, _id, _idSize, ZMQ_SNDMORE ) != int(_idSize) ||
::zmq_send( _socket, request.c_str(), request.length(), 0 ) !=
int( request.length( )))
{
throw std::runtime_error( "Send failed" );
}
}
void test( const std::string& request, const std::string& expected,
const int line )
{
sendRequest( request );
std::string response;
while( response.size() < expected.size( ))
{
if( ::zmq_recv( _socket, _id, _idSize, 0 ) != int( _idSize ))
throw std::runtime_error( "Recv failed" );
char msg[256];
const int read = ::zmq_recv( _socket, msg, sizeof( msg ), 0 );
BOOST_REQUIRE_GE( read, 0 );
response.append( msg, read );
}
BOOST_CHECK_MESSAGE( response == expected,
"At l." + std::to_string( line ) + ": " +
response + " != " + expected);
}
private:
void* _ctx;
void* _socket;
uint8_t _id[256];
size_t _idSize = sizeof( _id );
};
}
BOOST_AUTO_TEST_CASE(construction)
{
BOOST_CHECK_NE( server1.getURI().getHost(), "" );
BOOST_CHECK_NE( server1.getURI().getHost(), "*" );
BOOST_CHECK_NE( server1.getURI().getPort(), 0 );
BOOST_CHECK_NO_THROW( server1.getSocketDescriptor( ));
BOOST_CHECK_GT( server1.getSocketDescriptor(), 0 );
const zeroeq::URI uri( "tcp://" );
zeroeq::http::Server server2( uri );
zeroeq::http::Server server3( uri );
BOOST_CHECK_NE( server2.getURI(), server3.getURI( ));
BOOST_CHECK_NE( server2.getURI().getPort(), 0 );
BOOST_CHECK_THROW( zeroeq::http::Server( server2.getURI( )),
std::runtime_error );
BOOST_CHECK_NO_THROW( server2.getSocketDescriptor( ));
BOOST_CHECK_GT( server1.getSocketDescriptor(), 0 );
}
BOOST_AUTO_TEST_CASE(construction_argv_host_port)
{
const char* app = boost::unit_test::framework::master_test_suite().argv[0];
const char* argv[] = { app, "--zeroeq-http-server", "127.0.0.1:0" };
const int argc = sizeof(argv)/sizeof(char*);
std::unique_ptr< zeroeq::http::Server > server1 =
BOOST_REQUIRE( server1 );
BOOST_CHECK_EQUAL( server1->getURI().getHost(), "127.0.0.1" );
BOOST_CHECK_NE( server1->getURI().getPort(), 0 );
std::unique_ptr< zeroeq::http::Server > server2 =
zeroeq::http::Server::parse( argc, argv, shared );
BOOST_REQUIRE( server2 );
BOOST_CHECK_EQUAL( server2->getURI().getHost(), "127.0.0.1" );
BOOST_CHECK_NE( server2->getURI().getPort(), 0 );
}
BOOST_AUTO_TEST_CASE(construction_argv)
{
const char* app = boost::unit_test::framework::master_test_suite().argv[0];
const char* argv[] = { app, "--zeroeq-http-server" };
const int argc = sizeof(argv)/sizeof(char*);
std::unique_ptr< zeroeq::http::Server > server1 =
BOOST_REQUIRE( server1 );
BOOST_CHECK( !server1->getURI().getHost().empty( ));
BOOST_CHECK_NE( server1->getURI().getPort(), 0 );
std::unique_ptr< zeroeq::http::Server > server2 =
zeroeq::http::Server::parse( argc, argv, shared );
BOOST_CHECK( !server2->getURI().getHost().empty( ));
BOOST_CHECK_NE( server2->getURI().getPort(), 0 );
}
BOOST_AUTO_TEST_CASE(construction_empty_argv)
{
const char* app = boost::unit_test::framework::master_test_suite().argv[0];
const char* argv[] = { app };
const int argc = sizeof(argv)/sizeof(char*);
std::unique_ptr< zeroeq::http::Server > server1 =
BOOST_CHECK( !server1 );
std::unique_ptr< zeroeq::http::Server > server2 =
zeroeq::http::Server::parse( argc, argv, shared );
BOOST_CHECK( !server2 );
}
BOOST_AUTO_TEST_CASE(registration)
{
Foo foo;
BOOST_CHECK( server.register_( foo ));
BOOST_CHECK( !server.register_( foo ));
BOOST_CHECK( server.unregister( foo ));
BOOST_CHECK( !server.unregister( foo ));
BOOST_CHECK( server.register_( "foo", [](){ return "bla"; } ));
BOOST_CHECK( !server.register_( "foo", [](){ return "bla"; } ));
BOOST_CHECK( server.unregister( "foo" ));
BOOST_CHECK( !server.unregister( "foo" ));
BOOST_CHECK( server.subscribe( foo ));
BOOST_CHECK( !server.subscribe( foo ));
BOOST_CHECK( server.unsubscribe( foo ));
BOOST_CHECK( !server.unsubscribe( foo ));
BOOST_CHECK( server.subscribe( "foo", zeroeq::PUTPayloadFunc
([]( const std::string& ) { return true; })));
BOOST_CHECK( !server.subscribe( "foo", zeroeq::PUTPayloadFunc
([]( const std::string& ) { return true; })));
BOOST_CHECK( server.unsubscribe( "foo" ));
BOOST_CHECK( !server.unsubscribe( "foo" ));
BOOST_CHECK( server.add( foo ));
BOOST_CHECK( !server.add( foo ));
BOOST_CHECK( server.remove( foo ));
BOOST_CHECK( !server.remove( foo ));
}
BOOST_AUTO_TEST_CASE(get_serializable)
{
bool running = true;
Foo foo;
foo.registerSerializeCallback( [&]{ foo.setNotified(); });
server.register_( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( "GET /unknown HTTP/1.0\r\n\r\n", error404, __LINE__ );
BOOST_CHECK( !foo.getNotified( ));
client.test( "GET /test/Foo HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" + cors_headers +
"Content-Length: 48\r\n\r\n" )+
jsonGet, __LINE__ );
BOOST_CHECK( foo.getNotified( ));
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(get_event)
{
bool running = true;
bool requested = false;
server.register_( "test::Foo", [&](){ requested = true; return jsonGet; } );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( "GET /unknown HTTP/1.0\r\n\r\n", error404, __LINE__ );
BOOST_CHECK( !requested );
client.test( "GET /test/Foo HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" + cors_headers +
"Content-Length: 48\r\n\r\n" )+
jsonGet, __LINE__ );
BOOST_CHECK( requested );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(shared)
{
bool running = true;
zeroeq::Subscriber subscriber;
zeroeq::http::Server server1( subscriber );
zeroeq::http::Server server2( server1 );
Foo foo;
server2.register_( foo );
std::thread thread( [&]() { while( running ) subscriber.receive( 100 );});
Client client1( server1.getURI( ));
Client client2( server2.getURI( ));
client1.test( "GET /test/Foo HTTP/1.0\r\n\r\n", error404, __LINE__ );
client2.test( "GET /test/Foo HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" + cors_headers +
"Content-Length: 48\r\n\r\n" ) +
jsonGet, __LINE__ );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(put_serializable)
{
bool running = true;
Foo foo;
foo.registerDeserializedCallback( [&]{ foo.setNotified(); });
server.subscribe( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( std::string( "PUT /test/Foo HTTP/1.0\r\n\r\n" ),
error411, __LINE__ );
client.test(
std::string( "PUT /test/Foo HTTP/1.0\r\n" + cors_headers +
"Content-Length: 3\r\n\r\nFoo" ),
error400, __LINE__ );
client.test( std::string( "PUT /test/Bar HTTP/1.0\r\n" +
cors_headers +
"Content-Length: " ) +
std::to_string( jsonPut.length( )) + "\r\n\r\n" + jsonPut,
error404, __LINE__ );
BOOST_CHECK( !foo.getNotified( ));
client.test( std::string( "PUT /test/Foo HTTP/1.0\r\n" +
cors_headers +
"Content-Length: " ) +
std::to_string( jsonPut.length( )) + "\r\n\r\n" + jsonPut,
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 0\r\n\r\n" ),
__LINE__);
BOOST_CHECK( foo.getNotified( ));
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(put_event)
{
bool running = true;
bool receivedEmpty = false;
server.subscribe( "empty", zeroeq::PUTFunc
([&]() { receivedEmpty = true; return true; } ));
([&]( const std::string& received )
{ return jsonPut == received; } ));
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( std::string( "PUT /foo HTTP/1.0\r\n\r\n" ),
error411, __LINE__ );
client.test(
std::string( "PUT /foo HTTP/1.0\r\n" + cors_headers +
"Content-Length: 3\r\n\r\nFoo" ),
error400, __LINE__ );
client.test( std::string( "PUT /test/Bar HTTP/1.0\r\n" +
cors_headers +
"Content-Length: " ) +
std::to_string( jsonPut.length( )) + "\r\n\r\n" + jsonPut,
error404, __LINE__ );
client.test( std::string( "PUT /foo HTTP/1.0\r\n" +
cors_headers +
"Content-Length: " ) +
std::to_string( jsonPut.length( )) + "\r\n\r\n" + jsonPut,
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 0\r\n\r\n" ),
__LINE__);
client.test( std::string( "PUT /empty HTTP/1.0\r\n" +
cors_headers +
"Content-Length: 0" + "\r\n\r\n"),
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 0\r\n\r\n" ),
__LINE__);
BOOST_CHECK( receivedEmpty );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(post)
{
bool running = true;
Foo foo;
server.register_( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( std::string( "POST /test/Foo HTTP/1.0\r\n" +
cors_headers +
"Content-Length: " ) +
std::to_string( jsonPut.length( )) + "\r\n\r\n" + jsonPut,
error405, __LINE__ );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(largeGet)
{
bool running = true;
Foo foo;
server.register_( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( "GET" + std::string( 4096, ' ' ) +
"/test/Foo HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 48\r\n\r\n" )+
jsonGet, __LINE__ );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(issue157)
{
bool running = true;
Foo foo;
server.register_( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
// Close client before receiving request to provoke #157
{
Client client( server.getURI( ));
client.sendRequest( "GET" + std::string( 4096, ' ' ) +
"/test/Foo HTTP/1.0\r\n\r\n" );
}
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(garbage)
{
bool running = true;
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( "ramble mumble foo bar", "", __LINE__ );
running = false;
thread.join();
}
BOOST_AUTO_TEST_CASE(urlcasesensitivity)
{
bool running = true;
Foo foo;
server.register_( foo );
std::thread thread( [&]() { while( running ) server.receive( 100 ); });
Client client( server.getURI( ));
client.test( "GET" + std::string( 4096, ' ' ) +
"/TEST/FOO HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 48\r\n\r\n" ) +
jsonGet, __LINE__ );
client.test( "GET" + std::string( 4096, ' ' ) +
"/test/foo HTTP/1.0\r\n\r\n",
std::string( "HTTP/1.0 200 OK\r\n" +
cors_headers +
"Content-Length: 48\r\n\r\n" ) +
jsonGet, __LINE__ );
running = false;
thread.join();
}

Definition at line 31 of file server.h.

Constructor & Destructor Documentation

zeroeq::http::Server::Server ( const URI uri,
Receiver shared 
)

Construct a new HTTP server.

To process requests on the incoming port, call receive(). If no hostname is given, the server listens on all interfaces (INADDR_ANY). If no port is given, the server selects a random port. Use getURI() to retrieve the chosen parameters.

Parameters
uriThe server address in the form "[tcp://][hostname][:port]"
shareda shared receiver, see Receiver constructor.
Exceptions
std::runtime_erroron malformed URI or connection issues.

Member Function Documentation

bool zeroeq::http::Server::add ( servus::Serializable &  object)
inline

Subscribe and register the given object.

Definition at line 95 of file server.h.

References register_(), and subscribe().

+ Here is the call graph for this function:

SocketDescriptor zeroeq::http::Server::getSocketDescriptor ( ) const

Get the underlying socket descriptor.

Can be used by client code to be notified when new data is available and subsequently call receive.

Returns
the socket descriptor.
Exceptions
std::runtime_errorif the descriptor could not be obtained.
const URI& zeroeq::http::Server::getURI ( ) const

Get the publisher URI.

Contains the used hostname and port, if none where given in the constructor uri.

Returns
the publisher URI.
static std::unique_ptr< Server > zeroeq::http::Server::parse ( int  argc,
const char *const *  argv 
)
static

Create a new Server when requested.

The creation and parameters depend on the following command line parameters:

  • –zeroeq-http-server [host][:port]: Enable the server. The optional parameters configure the web server, running by default on INADDR_ANY and a randomly chosen port
bool zeroeq::http::Server::register_ ( servus::Serializable &  object)

Subscribe a serializable object to serve HTTP GET requests.

Every request will be directly handled during receive() by using toJSON(). To track updates on the object, the serializable's received function is called accordingly.

The subscribed object instance has to be valid until unregister().

Parameters
objectthe object to serve during receive()
Returns
true if subscription was successful, false otherwise

Referenced by add().

+ Here is the caller graph for this function:

bool zeroeq::http::Server::register_ ( const std::string &  event,
const GETFunc func 
)

Subscribe an event to serve HTTP GET requests.

Every request will be directly handled during receive() by calling the registered GET function.

Parameters
eventthe event name to serve during receive()
functhe callback function for serving the GET request
Returns
true if subscription was successful, false otherwise
bool zeroeq::http::Server::remove ( const servus::Serializable &  object)
inline

Unsubscribe and unregister the given object.

Definition at line 99 of file server.h.

References unregister(), and unsubscribe().

+ Here is the call graph for this function:

bool zeroeq::http::Server::subscribe ( servus::Serializable &  object)

Subscribe a serializable object to receive updates from HTTP PUT requests.

Every update will be directly applied on the object during receive() using fromJSON(). To track updates on the object, the serializable's updated function is called accordingly.

The subscribed object instance has to be valid until unsubscribe().

Parameters
objectthe object to update on receive()
Returns
true if subscription was successful, false otherwise

Referenced by add().

+ Here is the caller graph for this function:

bool zeroeq::http::Server::subscribe ( const std::string &  event,
const PUTFunc func 
)

Subscribe an event to receive HTTP PUT requests.

Every receival of the event will call the registered callback function.

Parameters
eventthe event name to receive PUT requests for during receive()
functhe callback function for serving the PUT request
Returns
true if subscription was successful, false otherwise
bool zeroeq::http::Server::subscribe ( const std::string &  event,
const PUTPayloadFunc func 
)

Subscribe an event to receive HTTP PUT requests with payload.

Every receival of the event will call the registered callback function.

Parameters
eventthe event name to receive PUT requests for during receive()
functhe callback function for serving the PUT request
Returns
true if subscription was successful, false otherwise
bool zeroeq::http::Server::unregister ( const servus::Serializable &  object)

Unsubscribe the given object for GET requests.

Referenced by remove().

+ Here is the caller graph for this function:

bool zeroeq::http::Server::unregister ( const std::string &  event)

Unsubscribe the given event name for GET requests.

bool zeroeq::http::Server::unsubscribe ( const servus::Serializable &  object)

Unsubscribe the given object to stop applying updates.

Referenced by remove().

+ Here is the caller graph for this function:

bool zeroeq::http::Server::unsubscribe ( const std::string &  event)

Unsubscribe the given event to stop receiving PUT requests.


The documentation for this class was generated from the following file: