rad 6.2.0
Loading...
Searching...
No Matches
msgRequestor.hpp
Go to the documentation of this file.
1
9#ifndef RAD_MSG_REQUESTOR_HPP
10#define RAD_MSG_REQUESTOR_HPP
11
12#include <rad/assert.hpp>
13#include <rad/errors.hpp>
14#include <rad/exceptions.hpp>
15#include <rad/logger.hpp>
17
18#include <azmq/message.hpp>
19#include <azmq/socket.hpp>
20
21#include <boost/asio.hpp>
22
23#include <google/protobuf/message.h>
24
25#include <chrono>
26#include <memory>
27
28namespace rad {
29
33template <typename TYPEREQ, typename TYPEREP>
35 public:
36 MsgRequestor(const std::string& endpoint, const std::string& identity,
37 boost::asio::io_context& ios,
38 std::function<void(const std::error_code&, TYPEREP)> reply_handler);
39 virtual ~MsgRequestor();
40
41 size_t Send(const TYPEREQ& payload, const long timeout = 0);
42
43 MsgRequestor(const MsgRequestor&) = delete;
45
46 private:
47 void Callback(const std::error_code& err_code, const std::string& msg_type_id, const void* data,
48 const size_t data_size);
49
50 MsgRequestorRaw m_msg_requestor_raw;
51 std::function<void(const std::error_code&, TYPEREP)> m_reply_handler;
52};
53
62template <typename TYPEREQ, typename TYPEREP>
64 const std::string& endpoint, const std::string& identity, boost::asio::io_context& ios,
65 std::function<void(const std::error_code&, TYPEREP)> reply_handler)
66 : m_msg_requestor_raw(
67 endpoint, identity, ios,
68 std::bind(&MsgRequestor::Callback, this, std::placeholders::_1, std::placeholders::_2,
69 std::placeholders::_3, std::placeholders::_4)),
70 m_reply_handler(reply_handler) {
72}
73
77template <typename TYPEREQ, typename TYPEREP>
81
89template <typename TYPEREQ, typename TYPEREP>
90size_t MsgRequestor<TYPEREQ, TYPEREP>::Send(const TYPEREQ& payload, const long timeout) {
92
93 /*
94 * Important: never call google::protobuf::ShutdownProtobufLibrary()
95 * before accessing the message descriptor!
96 */
97 RAD_ASSERTPTR(payload.GetDescriptor());
98 std::string payload_type = payload.GetDescriptor()->full_name();
99
100 std::string str;
101 if (payload.SerializeToString(&str) == false) {
102 LOG4CPLUS_ERROR(GetLogger(),
103 "Failed serializing to string payload type <" << payload_type << ">");
104 return 0;
105 }
106 return m_msg_requestor_raw.Send(payload_type, str, timeout);
107}
108
117template <typename TYPEREQ, typename TYPEREP>
118void MsgRequestor<TYPEREQ, TYPEREP>::Callback(const std::error_code& err_code,
119 const std::string& msg_type_id, const void* data,
120 const size_t data_size) {
122
123 TYPEREP reply;
124
125 if (err_code) {
126 m_reply_handler(err_code, reply);
127 return;
128 }
129
130 if (reply.ParseFromArray(data, data_size)) {
131 m_reply_handler({}, reply);
132 } else {
133 LOG4CPLUS_ERROR(GetLogger(), "Failed to parse reply type <" << msg_type_id << ">");
134 m_reply_handler(rad::ErrorCodes::DESERIALIZATION_ERR, reply);
135 // @todo throw exception?
136 }
137}
138
145template <typename TREQ, typename TREP>
147 using request_t = TREQ;
148 using reply_t = TREP;
149 using handler_t = std::function<void(const std::error_code&, reply_t)>;
150
160 MsgRequestor2(const std::string& endpoint, const std::string& identity,
161 boost::asio::io_context& ios)
162 : m_raw_requestor(endpoint, identity, ios) {}
163 virtual ~MsgRequestor2(){};
164
165 MsgRequestor2(const MsgRequestor2&) = delete;
169
180 const request_t& payload, handler_t handler,
181 std::chrono::milliseconds const timeout = std::chrono::milliseconds(0)) {
183 RAD_ASSERTPTR(payload.GetDescriptor());
184 std::string payload_type = payload.GetDescriptor()->full_name();
185
186 std::string str;
187 if (payload.SerializeToString(&str) == false) {
188 LOG4CPLUS_ERROR(GetLogger(),
189 "Failed serializing to string payload type <" << payload_type << ">");
190 return 0;
191 }
192 return m_raw_requestor.AsyncSendReceive(
193 payload_type, str,
194 [handler](std::error_code const& ec, std::string const& msg_type_id, const void* p_data,
195 const size_t size) {
196 // RAD_TRACE(GetLogger());
197 reply_t reply;
198 if (ec) {
199 handler(ec, reply);
200 return;
201 }
202
203 // Assert p_data != nullptr
204 if (reply.ParseFromArray(p_data, size)) {
205 handler({}, reply);
206 } else {
207 // LOG4CPLUS_ERROR(GetLogger(), "Failed to parse reply type <" << msg_type_id <<
208 // ">");
210 }
211 },
212 timeout);
213 }
214
215 private:
216 MsgRequestorRaw2 m_raw_requestor;
217};
218
219} // namespace rad
220
221#endif // RAD_MSG_REQUESTOR_HPP
Assert header file.
#define RAD_ASSERTPTR(a)
Definition assert.hpp:19
Definition msgRequestorRaw.hpp:30
Definition msgRequestor.hpp:34
MsgRequestor(const std::string &endpoint, const std::string &identity, boost::asio::io_context &ios, std::function< void(const std::error_code &, TYPEREP)> reply_handler)
Definition msgRequestor.hpp:63
MsgRequestor(const MsgRequestor &)=delete
size_t Send(const TYPEREQ &payload, const long timeout=0)
Definition msgRequestor.hpp:90
virtual ~MsgRequestor()
Definition msgRequestor.hpp:78
MsgRequestor & operator=(const MsgRequestor &)=delete
Logger class.
#define RAD_TRACE(logger)
Definition logger.hpp:21
Errors header file.
Exception classes header file.
MsgRequestorRaw class header file.
Definition actionsApp.cpp:23
log4cplus::Logger & GetLogger()
Definition logger.cpp:72
Definition errors.hpp:58
Definition msgRequestor.hpp:146
size_t AsyncSendReceive(const request_t &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition msgRequestor.hpp:179
MsgRequestor2(MsgRequestor2 &&)=default
MsgRequestor2(const MsgRequestor2 &)=delete
std::function< void(const std::error_code &, reply_t)> handler_t
Definition msgRequestor.hpp:149
TREQ request_t
Definition msgRequestor.hpp:147
TREP reply_t
Definition msgRequestor.hpp:148
MsgRequestor2(const std::string &endpoint, const std::string &identity, boost::asio::io_context &ios)
Definition msgRequestor.hpp:160
MsgRequestor2 & operator=(const MsgRequestor2 &)=delete
virtual ~MsgRequestor2()
Definition msgRequestor.hpp:163
MsgRequestor2 & operator=(MsgRequestor2 &&)=default
Definition msgRequestorRaw.hpp:71
size_t AsyncSendReceive(std::string const &payload_type, std::string const &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition msgRequestorRaw.cpp:287