rad 6.2.0
Loading...
Searching...
No Matches
subscriber.hpp
Go to the documentation of this file.
1
9#ifndef RAD_MAL_SUBSCRIBER_HPP
10#define RAD_MAL_SUBSCRIBER_HPP
11
12#include <rad/assert.hpp>
13#include <rad/logger.hpp>
14
15#include <mal/Cii.hpp>
16#include <mal/Mal.hpp>
17#include <mal/utility/LoadMal.hpp>
18
19#include <functional>
20
21namespace rad {
22namespace cii {
23
31template <typename TOPIC_TYPE>
33 public:
34 using TopicHandler_t = std::function<void(elt::mal::ps::Subscriber<TOPIC_TYPE>&,
35 const elt::mal::ps::DataEvent<TOPIC_TYPE>&)>;
36
45 Subscriber(const elt::mal::Uri& uri, TopicHandler_t handler,
46 const std::optional<elt::mal::Mal::Properties> mal_properties = {})
47 : m_subscriber(nullptr), m_subscription(nullptr) {
49
50 m_subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TOPIC_TYPE>(
51 elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT,
52 mal_properties ? *mal_properties : elt::mal::Mal::Properties());
53 RAD_ASSERTPTR(m_subscriber);
54
55 m_subscription =
56 m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(), handler);
57 RAD_ASSERTPTR(m_subscription);
58
59 LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
60 }
61
62 Subscriber(const Subscriber&) = delete;
63 Subscriber& operator=(const Subscriber&) = delete;
64
65 // Enable moving
66 Subscriber(Subscriber&& rhs) = default;
67
68 private:
69 std::unique_ptr<elt::mal::ps::Subscriber<TOPIC_TYPE>>
70 m_subscriber; // Pointer to MAL Subscriber.
71 std::unique_ptr<::elt::mal::ps::Subscription>
72 m_subscription; // Pointer to the topic subscription.
73};
74
75
76#if 0
77template<typename TOPIC_TYPE, typename EVENT_TYPE>
78class SubscriberSM {
79public:
80 explicit SubscriberSM(rad::SMAdapter& sm) : m_sm(sm) {
81 RAD_TRACE(GetLogger());
82 }
83
84 ~SubscriberSM() {
85 RAD_TRACE(GetLogger());
86 }
87
88 void Start(const std::string& uriStr,
89 const std::string& malType) {
90 RAD_TRACE(GetLogger());
91
92 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
93
94 elt::mal::Mal::Properties mal_properties;
95 auto mal_instance = elt::mal::loadMal(malType, mal_properties);
96
97 auto uri = elt::mal::Uri(uriStr);
98 std::string scheme = uri.scheme().to_string();
99 factory.registerMal(scheme, mal_instance);
100
101 mSubscriber = factory.getSubscriber<TOPIC_TYPE>(uri, elt::mal::ps::qos::QoS::DEFAULT, mal_properties);
102 if (m_subscriber == nullptr) {
103 throw std::runtime_error("Cannot create Subscriber");
104 }
105
106/*
107 auto topicInstance = m_subscriber->createDataEntity();
108 if (topicInstance == nullptr) {
109 throw std::runtime_error("Subscriber cannot create data entity");
110 }
111
112 auto dataEventFilter = elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all();
113*/
114
115 m_subscription = m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
116 [this](elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
117 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
118 this->Callback(Subscriber, event);
119 });
120 if (m_subscription == nullptr) {
121 throw std::runtime_error("Subscriber Async failed");
122 }
123
124 LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
125 }
126
127 void Stop() {
129 // @todo what happens to the subscription?
130 m_subscriber->close();
131 }
132
133 void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
134 const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
136
137 if (event.hasValidData()) {
138 //auto sample = event.getData();
139 //auto altPos = sample->getAlt();
140 //auto azPos = sample->getAz();
141 //std::cout << "Counter " << ++mNumSamples << " Alt = " << altPos << " Az = " << azPos << std::endl;
142 LOG4CPLUS_DEBUG(GetLogger(), "Received topic, posting event to SM ...");
143 //rad::UniqueEvent(new MalEvents::DevMeas())
144 m_sm.PostEvent(rad::UniqueEvent(new EVENT_TYPE(event.getData())));
145 } else {
146 LOG4CPLUS_WARN(GetLogger(), "Received invalid data!");
147 }
148
149 }
150
151 SubscriberSM(const SubscriberSM&) = delete;
152 SubscriberSM& operator= (const SubscriberSM&) = delete;
153
154 // Enable moving
155 //SubscriberSM(SubscriberSM&& rhs) = default;
156
157private:
158 rad::SMAdapter& m_sm;
159 UniqueEvent m_topic_event;
160 std::shared_ptr<elt::mal::ps::SubscriberSM<TOPIC_TYPE>> m_subscriber;
161 std::unique_ptr<::elt::mal::ps::Subscription> m_subscription;
162};
163#endif
164
165} // namespace cii
166} // namespace rad
167
168#endif // RAD_MAL_SUBSCRIBER_HPP
Assert header file.
#define RAD_ASSERTPTR(a)
Definition assert.hpp:19
Definition smAdapter.hpp:60
Definition subscriber.hpp:32
Subscriber & operator=(const Subscriber &)=delete
std::function< void(elt::mal::ps::Subscriber< TOPIC_TYPE > &, const elt::mal::ps::DataEvent< TOPIC_TYPE > &)> TopicHandler_t
Definition subscriber.hpp:34
Subscriber(const Subscriber &)=delete
Subscriber(const elt::mal::Uri &uri, TopicHandler_t handler, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition subscriber.hpp:45
Subscriber(Subscriber &&rhs)=default
Logger class.
#define RAD_TRACE(logger)
Definition logger.hpp:21
log4cplus::Logger & GetLogger()
Definition logger.cpp:14
Definition actionsApp.cpp:23
std::unique_ptr< AnyEvent > UniqueEvent
Definition anyEvent.hpp:45
log4cplus::Logger & GetLogger()
Definition logger.cpp:72