9 #ifndef RAD_MAL_SUBSCRIBER_HPP
10 #define RAD_MAL_SUBSCRIBER_HPP
12 #include <rad/assert.hpp>
13 #include <rad/logger.hpp>
15 #include <mal/Cii.hpp>
16 #include <mal/Mal.hpp>
17 #include <mal/utility/LoadMal.hpp>
31 template <
typename TOPIC_TYPE>
34 using TopicHandler_t = std::function<void(elt::mal::ps::Subscriber<TOPIC_TYPE>&,
35 const elt::mal::ps::DataEvent<TOPIC_TYPE>&)>;
46 const std::optional<elt::mal::Mal::Properties> mal_properties = {})
47 : m_subscriber(
nullptr), m_subscription(
nullptr) {
50 m_subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TOPIC_TYPE>(
51 elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT,
52 mal_properties ? *mal_properties : elt::mal::Mal::Properties());
56 m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
handler);
59 LOG4CPLUS_DEBUG(
GetLogger(),
"Subscriber registered async.");
69 std::unique_ptr<elt::mal::ps::Subscriber<TOPIC_TYPE>>
71 std::unique_ptr<::elt::mal::ps::Subscription>
84 template <
typename TOPIC_TYPE,
typename EVENT_TYPE>
97 const std::optional<elt::mal::Mal::Properties> mal_properties = {})
100 std::placeholders::_1, std::placeholders::_2),
106 void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& subscriber,
107 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
110 if (event.hasValidData()) {
111 LOG4CPLUS_DEBUG(
GetLogger(),
"Received topic, posting event to SM via ASIO.");
114 LOG4CPLUS_WARN(
GetLogger(),
"Received invalid data!");
129 template<
typename TOPIC_TYPE,
typename EVENT_TYPE>
140 void Start(
const std::string& uriStr,
141 const std::string& malType) {
144 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
146 elt::mal::Mal::Properties mal_properties;
147 auto mal_instance = elt::mal::loadMal(malType, mal_properties);
149 auto uri = elt::mal::Uri(uriStr);
150 std::string scheme = uri.scheme().to_string();
151 factory.registerMal(scheme, mal_instance);
153 mSubscriber = factory.getSubscriber<TOPIC_TYPE>(uri, elt::mal::ps::qos::QoS::DEFAULT, mal_properties);
154 if (m_subscriber ==
nullptr) {
155 throw std::runtime_error(
"Cannot create Subscriber");
167 m_subscription = m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
168 [
this](elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
169 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
170 this->Callback(Subscriber, event);
172 if (m_subscription ==
nullptr) {
173 throw std::runtime_error(
"Subscriber Async failed");
176 LOG4CPLUS_DEBUG(
GetLogger(),
"Subscriber registered async.");
182 m_subscriber->close();
185 void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
186 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
189 if (event.hasValidData()) {
194 LOG4CPLUS_DEBUG(
GetLogger(),
"Received topic, posting event to SM ...");
198 LOG4CPLUS_WARN(
GetLogger(),
"Received invalid data!");
203 SubscriberSM(
const SubscriberSM&) =
delete;
204 SubscriberSM& operator= (
const SubscriberSM&) =
delete;
212 std::shared_ptr<elt::mal::ps::SubscriberSM<TOPIC_TYPE>> m_subscriber;
213 std::unique_ptr<::elt::mal::ps::Subscription> m_subscription;
220 #endif // RAD_MAL_SUBSCRIBER_HPP