Line data Source code
1 :
2 : /* Copyright (c) 2014-2017, Human Brain Project
3 : * Daniel Nachbaur <daniel.nachbaur@epfl.ch>
4 : * Stefan.Eilemann@epfl.ch
5 : */
6 :
7 : #ifndef ZEROEQ_SUBSCRIBER_H
8 : #define ZEROEQ_SUBSCRIBER_H
9 :
10 : #include <zeroeq/receiver.h> // base class
11 : #include <zeroeq/uri.h> // used inline
12 :
13 : #include <vector>
14 :
15 : namespace zeroeq
16 : {
17 : /**
18 : * Subscribes to Publisher to receive events.
19 : *
20 : * If the subscriber is in the same session as discovered publishers, it
21 : * automatically subscribes to those publishers. Publishers from the same
22 : * application instance are not considered though.
23 : *
24 : * A subscription to a non-existing publisher is valid. It will start receiving
25 : * events once the other publisher(s) is(are) publishing.
26 : *
27 : * A receive on any Subscriber of a shared group will work on all subscribers
28 : * and call the registered handlers.
29 : *
30 : * Not thread safe.
31 : *
32 : * Example: @include tests/subscriber.cpp
33 : */
34 : class Subscriber : public Receiver
35 : {
36 : public:
37 : /**
38 : * Create a default subscriber.
39 : *
40 : * Postconditions:
41 : * - discovers publishers on _zeroeq_pub._tcp ZeroConf service
42 : * - filters session \<username\> or ZEROEQ_PUB_SESSION from environment
43 : *
44 : * @throw std::runtime_error if ZeroConf is not available
45 : */
46 : ZEROEQ_API Subscriber();
47 :
48 : /**
49 : * Create a subscriber which subscribes to publisher(s) from the given
50 : * session.
51 : *
52 : * Postconditions:
53 : * - discovers publishers on _zeroeq_pub._tcp ZeroConf service
54 : * - filters for given session
55 : *
56 : * @param session session name used for filtering of discovered publishers
57 : * @throw std::runtime_error if ZeroConf is not available
58 : */
59 : ZEROEQ_API explicit Subscriber(const std::string& session);
60 :
61 : /**
62 : * Create a subscriber which subscribes to specific publishers.
63 : *
64 : * Postconditions:
65 : * - connected to the publishers on the given URIs once publishers are
66 : * running on the URIs
67 : *
68 : * @param uris publisher URIs in the format [scheme://]*|host|IP|IF:port
69 : * @throw std::runtime_error if an URI is not fully qualified
70 : */
71 : ZEROEQ_API explicit Subscriber(const URIs& uris);
72 :
73 : /**
74 : * Create a default shared subscriber.
75 : *
76 : * @sa Subscriber()
77 : * @param shared another receiver to share data reception with
78 : */
79 : ZEROEQ_API explicit Subscriber(Receiver& shared);
80 :
81 : /**
82 : * Create a shared subscriber which subscribes to publisher(s) from the
83 : * given session.
84 : *
85 : * @sa Subscriber( const std::string& )
86 : *
87 : * @param session only subscribe to publishers of the same session
88 : * @param shared another receiver to share data reception with
89 : */
90 : ZEROEQ_API Subscriber(const std::string& session, Receiver& shared);
91 :
92 : /**
93 : * Create a shared subscriber which subscribes to publishers on the given
94 : * URIs.
95 : *
96 : * @sa Subscriber( const URIs& )
97 : *
98 : * @param uris publisher URIs in the format [scheme://]*|host|IP|IF:port
99 : * @param shared another receiver to share data reception with
100 : */
101 : ZEROEQ_API Subscriber(const URIs& uris, Receiver& shared);
102 :
103 : /** Destroy this subscriber and withdraw any subscriptions. */
104 : ZEROEQ_API ~Subscriber();
105 :
106 14 : explicit Subscriber(const URI& uri) //!< @deprecated
107 16 : : Subscriber(URIs{uri})
108 : {
109 12 : }
110 4 : Subscriber(const URI& uri, Receiver& shared) //!< @deprecated
111 5 : : Subscriber(URIs{uri}, shared)
112 : {
113 3 : }
114 :
115 : /**
116 : * Subscribe a serializable object to receive updates from any connected
117 : * publisher.
118 : *
119 : * Every update will be directly applied on the object during receive(). To
120 : * track updates on the object, the serializable's updated function is
121 : * called accordingly.
122 : *
123 : * The subscribed object instance has to be valid until unsubscribe().
124 : *
125 : * @param serializable the object to update on receive()
126 : * @return true if subscription was successful, false otherwise
127 : */
128 : ZEROEQ_API bool subscribe(servus::Serializable& serializable);
129 :
130 : /**
131 : * Subscribe to an event from any connected publisher.
132 : *
133 : * Every receival of the event will call the registered callback function.
134 : *
135 : * @param event the event identifier to subscribe to
136 : * @param func the callback function called upon receival
137 : * @return true if subscription was successful, false otherwise
138 : */
139 : ZEROEQ_API bool subscribe(const uint128_t& event, const EventFunc& func);
140 :
141 : /**
142 : * Subscribe to an event with payload from any connected publisher.
143 : *
144 : * Every receival of the event will call the registered callback function.
145 : *
146 : * @param event the event identifier to subscribe to
147 : * @param func the callback function called upon receival
148 : * @return true if subscription was successful, false otherwise
149 : */
150 : ZEROEQ_API bool subscribe(const uint128_t& event,
151 : const EventPayloadFunc& func);
152 :
153 : /**
154 : * Unsubscribe a serializable object to stop applying updates from any
155 : * connected publisher.
156 : *
157 : * @param serializable the object to stop updating on receive()
158 : * @return true if removal of subscription was successful, false otherwise
159 : */
160 : ZEROEQ_API bool unsubscribe(const servus::Serializable& serializable);
161 :
162 : ZEROEQ_API bool unsubscribe(const uint128_t& event);
163 :
164 : /** @return the session name that is used for filtering. */
165 : ZEROEQ_API const std::string& getSession() const;
166 :
167 : private:
168 : class Impl;
169 : std::unique_ptr<Impl> _impl;
170 :
171 : // Receiver API
172 : void addSockets(std::vector<detail::Socket>& entries) final;
173 : bool process(detail::Socket& socket) final;
174 : void update() final;
175 : void addConnection(const std::string& uri) final;
176 : };
177 : }
178 :
179 : #endif
|