Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Daniel Nachbaur <daniel.nachbaur@epfl.ch>
4 : * Stefan.Eilemann@epfl.ch
5 : */
6 :
7 : #include "publisher.h"
8 :
9 : #include "detail/byteswap.h"
10 : #include "detail/common.h"
11 : #include "detail/constants.h"
12 : #include "detail/sender.h"
13 : #include "log.h"
14 :
15 : #include <servus/serializable.h>
16 :
17 : #include <zmq.h>
18 :
19 : #include <cstring>
20 : #include <map>
21 :
22 : namespace zeroeq
23 : {
24 : class Publisher::Impl : public detail::Sender
25 : {
26 : public:
27 26 : Impl(const URI& uri_, const std::string& session)
28 26 : : detail::Sender(uri_, ZMQ_XPUB, PUBLISHER_SERVICE,
29 52 : session == DEFAULT_SESSION ? getDefaultPubSession()
30 28 : : session)
31 : {
32 26 : if (session.empty())
33 1 : ZEROEQTHROW(std::runtime_error(
34 : "Empty session is not allowed for publisher"));
35 :
36 50 : const std::string& zmqURI = buildZmqURI(uri);
37 25 : if (zmq_bind(socket.get(), zmqURI.c_str()) == -1)
38 1 : ZEROEQTHROW(std::runtime_error(
39 : std::string("Cannot bind publisher socket '") + zmqURI +
40 : "': " + zmq_strerror(zmq_errno())));
41 :
42 24 : initURI();
43 24 : if (session != NULL_SESSION)
44 10 : announce();
45 24 : }
46 :
47 24 : ~Impl() {}
48 40023 : bool publish(const servus::Serializable& serializable)
49 : {
50 80046 : const servus::Serializable::Data& data = serializable.toBinary();
51 40023 : return publish(serializable.getTypeIdentifier(), data.ptr.get(),
52 120069 : data.size);
53 : }
54 :
55 40032 : bool publish(uint128_t event, const void* data, const size_t size)
56 : {
57 : #ifdef ZEROEQ_BIGENDIAN
58 : detail::byteswap(event); // convert to little endian wire protocol
59 : #endif
60 40032 : const bool hasPayload = data && size > 0;
61 :
62 : zmq_msg_t msgHeader;
63 40032 : zmq_msg_init_size(&msgHeader, sizeof(event));
64 40032 : memcpy(zmq_msg_data(&msgHeader), &event, sizeof(event));
65 40032 : int ret = zmq_msg_send(&msgHeader, socket.get(),
66 40032 : hasPayload ? ZMQ_SNDMORE : 0);
67 40032 : zmq_msg_close(&msgHeader);
68 40032 : if (ret == -1)
69 : {
70 : ZEROEQWARN << "Cannot publish message header, got "
71 0 : << zmq_strerror(zmq_errno()) << std::endl;
72 0 : return false;
73 : }
74 :
75 40032 : if (!hasPayload)
76 20009 : return true;
77 :
78 : zmq_msg_t msg;
79 20023 : zmq_msg_init_size(&msg, size);
80 20023 : ::memcpy(zmq_msg_data(&msg), data, size);
81 20023 : ret = zmq_msg_send(&msg, socket.get(), 0);
82 20023 : zmq_msg_close(&msg);
83 20023 : if (ret == -1)
84 : {
85 : ZEROEQWARN << "Cannot publish message data, got "
86 0 : << zmq_strerror(zmq_errno()) << std::endl;
87 0 : return false;
88 : }
89 20023 : return true;
90 : }
91 : };
92 :
93 1 : Publisher::Publisher()
94 1 : : _impl(new Impl(URI(), DEFAULT_SESSION))
95 : {
96 1 : }
97 :
98 20 : Publisher::Publisher(const std::string& session)
99 21 : : _impl(new Impl(URI(), session))
100 : {
101 19 : }
102 :
103 2 : Publisher::Publisher(const URI& uri)
104 3 : : _impl(new Impl(uri, DEFAULT_SESSION))
105 : {
106 1 : }
107 :
108 3 : Publisher::Publisher(const URI& uri, const std::string& session)
109 3 : : _impl(new Impl(uri, session))
110 : {
111 3 : }
112 :
113 24 : Publisher::~Publisher()
114 : {
115 24 : }
116 :
117 40023 : bool Publisher::publish(const servus::Serializable& serializable)
118 : {
119 40023 : return _impl->publish(serializable);
120 : }
121 :
122 6 : bool Publisher::publish(const uint128_t& event)
123 : {
124 6 : return _impl->publish(event, nullptr, 0);
125 : }
126 :
127 3 : bool Publisher::publish(const uint128_t& event, const void* data,
128 : const size_t size)
129 : {
130 3 : return _impl->publish(event, data, size);
131 : }
132 :
133 2 : std::string Publisher::getAddress() const
134 : {
135 2 : return _impl->getAddress();
136 : }
137 :
138 7 : const std::string& Publisher::getSession() const
139 : {
140 7 : return _impl->getSession();
141 : }
142 :
143 15 : const URI& Publisher::getURI() const
144 : {
145 15 : return _impl->uri;
146 : }
147 :
148 2 : zmq::SocketPtr Publisher::getSocket()
149 : {
150 2 : return _impl->socket;
151 : }
152 24 : }
|