9#ifndef RAD_MAL_SUBSCRIBER_HPP
10#define RAD_MAL_SUBSCRIBER_HPP
17#include <mal/utility/LoadMal.hpp>
31template <
typename TOPIC_TYPE>
34 using TopicHandler_t = std::function<void(elt::mal::ps::Subscriber<TOPIC_TYPE>&,
35 const elt::mal::ps::DataEvent<TOPIC_TYPE>&)>;
46 const std::optional<elt::mal::Mal::Properties> mal_properties = {})
47 : m_subscriber(nullptr), m_subscription(nullptr) {
50 m_subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TOPIC_TYPE>(
51 elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT,
52 mal_properties ? *mal_properties : elt::mal::Mal::Properties());
56 m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(), handler);
59 LOG4CPLUS_DEBUG(
GetLogger(),
"Subscriber registered async.");
69 std::unique_ptr<elt::mal::ps::Subscriber<TOPIC_TYPE>>
71 std::unique_ptr<::elt::mal::ps::Subscription>
77template<
typename TOPIC_TYPE,
typename EVENT_TYPE>
88 void Start(
const std::string& uriStr,
89 const std::string& malType) {
92 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
94 elt::mal::Mal::Properties mal_properties;
95 auto mal_instance = elt::mal::loadMal(malType, mal_properties);
97 auto uri = elt::mal::Uri(uriStr);
98 std::string scheme = uri.scheme().to_string();
99 factory.registerMal(scheme, mal_instance);
101 mSubscriber = factory.getSubscriber<TOPIC_TYPE>(uri, elt::mal::ps::qos::QoS::DEFAULT, mal_properties);
102 if (m_subscriber ==
nullptr) {
103 throw std::runtime_error(
"Cannot create Subscriber");
115 m_subscription = m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
116 [
this](elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
117 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
118 this->Callback(Subscriber, event);
120 if (m_subscription ==
nullptr) {
121 throw std::runtime_error(
"Subscriber Async failed");
124 LOG4CPLUS_DEBUG(
GetLogger(),
"Subscriber registered async.");
130 m_subscriber->close();
133 void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
134 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
137 if (event.hasValidData()) {
142 LOG4CPLUS_DEBUG(
GetLogger(),
"Received topic, posting event to SM ...");
146 LOG4CPLUS_WARN(
GetLogger(),
"Received invalid data!");
151 SubscriberSM(
const SubscriberSM&) =
delete;
152 SubscriberSM& operator= (
const SubscriberSM&) =
delete;
160 std::shared_ptr<elt::mal::ps::SubscriberSM<TOPIC_TYPE>> m_subscriber;
161 std::unique_ptr<::elt::mal::ps::Subscription> m_subscription;
#define RAD_ASSERTPTR(a)
Definition assert.hpp:19
Definition smAdapter.hpp:60
Definition subscriber.hpp:32
Subscriber & operator=(const Subscriber &)=delete
std::function< void(elt::mal::ps::Subscriber< TOPIC_TYPE > &, const elt::mal::ps::DataEvent< TOPIC_TYPE > &)> TopicHandler_t
Definition subscriber.hpp:34
Subscriber(const Subscriber &)=delete
Subscriber(const elt::mal::Uri &uri, TopicHandler_t handler, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition subscriber.hpp:45
Subscriber(Subscriber &&rhs)=default
#define RAD_TRACE(logger)
Definition logger.hpp:21
log4cplus::Logger & GetLogger()
Definition logger.cpp:14
Definition actionsApp.cpp:23
std::unique_ptr< AnyEvent > UniqueEvent
Definition anyEvent.hpp:45
log4cplus::Logger & GetLogger()
Definition logger.cpp:72