8 #include <opentracing/ext/tags.h>
9 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
17 #include <boost/functional/hash.hpp>
19 #include <mal/Mal.hpp>
20 #include <mal/util/MalTracingUtil.hpp>
21 #include <mal/utility/future.hpp>
22 #include <mal/rr/ClientAsync.hpp>
23 #include <mal/rr/AmiZpb.hpp>
24 #include <mal/ps/ZpbDataEntity.hpp>
25 #include <mal/utility/Uri.hpp>
26 #include <mal/util/ZpbList.hpp>
27 #include <mal/types/ZpbMalTypes.hpp>
28 #include <mal/rr/ServerContextZpb.hpp>
29 #include <mal/rr/ServerContextZpbAmi.hpp>
30 #include <mal/rr/ServerContextProvider.hpp>
41 typedef ::generated::zpb::hellomalif::ExceptionErr
ZpbType;
47 m_zpbObj(inner, ::
elt::mal::util::list::NoopDeleter<
ZpbType*>())
78 return m_zpbObj->desc();
81 void copy(const ::hellomalif::ExceptionErr &from) {
82 setDesc(from.getDesc());
90 void setDesc(
const std::string&
desc) {
91 m_zpbObj->set_desc(
desc);
94 std::shared_ptr<ZpbType> m_zpbObj;
113 public virtual ::elt::mal::ps::ZpbDataEntity<::hellomalif::LogInfo> {
115 typedef ::generated::zpb::hellomalif::LogInfo
ZpbType;
121 m_zpbObj(inner, ::
elt::mal::util::list::NoopDeleter<
ZpbType*>())
125 explicit LogInfoImpl(const ::generated::zpb::hellomalif::LogInfo& inner):
146 return m_zpbObj->level();
150 m_zpbObj->set_level(
level);
153 return m_zpbObj->logger();
157 m_zpbObj->set_logger(
logger);
160 std::unique_ptr<::hellomalif::LogInfo>
clone()
const override {
163 auto const* traceableDataEntity =
dynamic_cast<const ::elt::mal::ps::Traceable*
>(
this);
164 if (traceableDataEntity) {
165 auto *clonedTraceable =
dynamic_cast<::elt::mal::ps::Traceable*
>(cloned.get());
166 if (clonedTraceable) {
167 clonedTraceable->setTrace(traceableDataEntity->getTrace());
173 bool keyEquals(const ::hellomalif::LogInfo& other)
const override {
178 std::size_t seed = 0;
182 std::unique_ptr<::hellomalif::LogInfo>
cloneKey()
const override {
183 std::unique_ptr<::hellomalif::LogInfo> entity(
new LogInfoImpl());
192 msg.rebuild(m_zpbObj->ByteSizeLong());
193 return m_zpbObj->SerializeToArray(msg.data(), msg.size());
197 return m_zpbObj->SerializeToString(&output);
201 return m_zpbObj->ParseFromString(data);
205 google::protobuf::io::ArrayInputStream ais(data, size);
206 return m_zpbObj->ParseFromZeroCopyStream(&ais);
209 void copy(const ::hellomalif::LogInfo &from)
override {
213 auto const* traceableFrom =
dynamic_cast<const ::elt::mal::ps::Traceable*
>(&from);
215 auto* traceableTo =
dynamic_cast<::elt::mal::ps::Traceable*
>(
this);
217 traceableTo->setTrace(traceableFrom->getTrace());
233 std::shared_ptr<ZpbType> m_zpbObj;
252 template<
typename Intf>
258 :
public virtual ::elt::mal::rr::ClientAsyncImpl<::hellomalif::StdCmdsAsync,
259 ::generated::zpb::hellomalif::StdCmds_Request,
260 ::generated::zpb::hellomalif::StdCmds_Reply>,
264 const ::elt::mal::Uri &uri,
265 const std::vector<std::shared_ptr<::elt::mal::rr::qos::QoS>> &standardQoS,
266 const ::elt::mal::Mal::Properties &malSpecificProperties,
267 const std::shared_ptr<::elt::mal::ZpbMal> &zpbMal):
270 uri, standardQoS, malSpecificProperties, zpbMal) {}
272 ::elt::mal::future<std::string>
Init()
override {
275 new ::generated::zpb::hellomalif::StdCmds_Init_In();
281 auto span = ::elt::mal::util::tracing::createSpan(
282 "Init", {::opentracing::SetTag {
283 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
285 return sendRequest(request, std::move(span)).then([](
286 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
287 auto reply = replyFut.get();
288 auto code = reply.header().code();
290 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
291 std::ostringstream errMsg;
293 errMsg <<
"Code:" << code <<
" Message:";
294 reply.header().has_exceptionmessage() ?
295 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
297 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
298 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
299 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
301 auto data = reply.data();
302 if (data.init().has_exexceptionerr()) {
303 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.init().exexceptionerr()));
305 return (std::string)data.init().retval();
309 ::elt::mal::future<std::string>
Reset()
override {
312 new ::generated::zpb::hellomalif::StdCmds_Reset_In();
318 auto span = ::elt::mal::util::tracing::createSpan(
319 "Reset", {::opentracing::SetTag {
320 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
322 return sendRequest(request, std::move(span)).then([](
323 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
324 auto reply = replyFut.get();
325 auto code = reply.header().code();
327 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
328 std::ostringstream errMsg;
330 errMsg <<
"Code:" << code <<
" Message:";
331 reply.header().has_exceptionmessage() ?
332 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
334 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
335 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
336 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
338 auto data = reply.data();
339 if (data.reset().has_exexceptionerr()) {
340 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.reset().exexceptionerr()));
342 return (std::string)data.reset().retval();
346 ::elt::mal::future<std::string>
Enable()
override {
349 new ::generated::zpb::hellomalif::StdCmds_Enable_In();
355 auto span = ::elt::mal::util::tracing::createSpan(
356 "Enable", {::opentracing::SetTag {
357 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
359 return sendRequest(request, std::move(span)).then([](
360 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
361 auto reply = replyFut.get();
362 auto code = reply.header().code();
364 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
365 std::ostringstream errMsg;
367 errMsg <<
"Code:" << code <<
" Message:";
368 reply.header().has_exceptionmessage() ?
369 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
371 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
372 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
373 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
375 auto data = reply.data();
376 if (data.enable().has_exexceptionerr()) {
377 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.enable().exexceptionerr()));
379 return (std::string)data.enable().retval();
383 ::elt::mal::future<std::string>
Disable()
override {
386 new ::generated::zpb::hellomalif::StdCmds_Disable_In();
392 auto span = ::elt::mal::util::tracing::createSpan(
393 "Disable", {::opentracing::SetTag {
394 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
396 return sendRequest(request, std::move(span)).then([](
397 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
398 auto reply = replyFut.get();
399 auto code = reply.header().code();
401 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
402 std::ostringstream errMsg;
404 errMsg <<
"Code:" << code <<
" Message:";
405 reply.header().has_exceptionmessage() ?
406 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
408 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
409 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
410 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
412 auto data = reply.data();
413 if (data.disable().has_exexceptionerr()) {
414 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.disable().exexceptionerr()));
416 return (std::string)data.disable().retval();
420 ::elt::mal::future<std::string>
GetState()
override {
423 new ::generated::zpb::hellomalif::StdCmds_GetState_In();
429 auto span = ::elt::mal::util::tracing::createSpan(
430 "GetState", {::opentracing::SetTag {
431 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
433 return sendRequest(request, std::move(span)).then([](
434 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
435 auto reply = replyFut.get();
436 auto code = reply.header().code();
438 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
439 std::ostringstream errMsg;
441 errMsg <<
"Code:" << code <<
" Message:";
442 reply.header().has_exceptionmessage() ?
443 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
445 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
446 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
447 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
449 auto data = reply.data();
450 if (data.getstate().has_exexceptionerr()) {
451 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.getstate().exexceptionerr()));
453 return (std::string)data.getstate().retval();
460 new ::generated::zpb::hellomalif::StdCmds_GetStatus_In();
466 auto span = ::elt::mal::util::tracing::createSpan(
467 "GetStatus", {::opentracing::SetTag {
468 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
470 return sendRequest(request, std::move(span)).then([](
471 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
472 auto reply = replyFut.get();
473 auto code = reply.header().code();
475 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
476 std::ostringstream errMsg;
478 errMsg <<
"Code:" << code <<
" Message:";
479 reply.header().has_exceptionmessage() ?
480 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
482 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
483 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
484 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
486 auto data = reply.data();
487 if (data.getstatus().has_exexceptionerr()) {
488 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.getstatus().exexceptionerr()));
490 return (std::string)data.getstatus().retval();
497 new ::generated::zpb::hellomalif::StdCmds_GetVersion_In();
503 auto span = ::elt::mal::util::tracing::createSpan(
504 "GetVersion", {::opentracing::SetTag {
505 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
507 return sendRequest(request, std::move(span)).then([](
508 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
509 auto reply = replyFut.get();
510 auto code = reply.header().code();
512 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
513 std::ostringstream errMsg;
515 errMsg <<
"Code:" << code <<
" Message:";
516 reply.header().has_exceptionmessage() ?
517 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
519 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
520 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
521 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
523 auto data = reply.data();
524 if (data.getversion().has_exexceptionerr()) {
525 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.getversion().exexceptionerr()));
527 return (std::string)data.getversion().retval();
531 ::elt::mal::future<std::string>
Stop()
override {
534 new ::generated::zpb::hellomalif::StdCmds_Stop_In();
540 auto span = ::elt::mal::util::tracing::createSpan(
541 "Stop", {::opentracing::SetTag {
542 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
544 return sendRequest(request, std::move(span)).then([](
545 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
546 auto reply = replyFut.get();
547 auto code = reply.header().code();
549 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
550 std::ostringstream errMsg;
552 errMsg <<
"Code:" << code <<
" Message:";
553 reply.header().has_exceptionmessage() ?
554 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
556 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
557 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
558 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
560 auto data = reply.data();
561 if (data.stop().has_exexceptionerr()) {
562 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.stop().exexceptionerr()));
564 return (std::string)data.stop().retval();
568 ::elt::mal::future<std::string>
Exit()
override {
571 new ::generated::zpb::hellomalif::StdCmds_Exit_In();
577 auto span = ::elt::mal::util::tracing::createSpan(
578 "Exit", {::opentracing::SetTag {
579 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
581 return sendRequest(request, std::move(span)).then([](
582 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
583 auto reply = replyFut.get();
584 auto code = reply.header().code();
586 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
587 std::ostringstream errMsg;
589 errMsg <<
"Code:" << code <<
" Message:";
590 reply.header().has_exceptionmessage() ?
591 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
593 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
594 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
595 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
597 auto data = reply.data();
598 if (data.exit().has_exexceptionerr()) {
599 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.exit().exexceptionerr()));
601 return (std::string)data.exit().retval();
605 ::elt::mal::future<std::string>
SetLogLevel(
const std::shared_ptr<::hellomalif::LogInfo>& info)
override {
608 new ::generated::zpb::hellomalif::StdCmds_SetLogLevel_In();
624 auto span = ::elt::mal::util::tracing::createSpan(
625 "SetLogLevel", {::opentracing::SetTag {
626 ::opentracing::ext::span_kind, ::opentracing::ext::span_kind_rpc_client }});
628 return sendRequest(request, std::move(span)).then([](
629 ::elt::mal::future<::generated::zpb::hellomalif::StdCmds_Reply> replyFut) {
630 auto reply = replyFut.get();
631 auto code = reply.header().code();
633 if (code != ::elt::mal::zpb::rr::CompletionCode::COMPLETED) {
634 std::ostringstream errMsg;
636 errMsg <<
"Code:" << code <<
" Message:";
637 reply.header().has_exceptionmessage() ?
638 errMsg << reply.header().exceptionmessage() : errMsg <<
"Unknown error message";
640 code == ::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION ?
641 ::elt::mal::throw_exception(::elt::mal::UnhandledRemoteException(errMsg.str())) :
642 ::elt::mal::throw_exception(::elt::mal::MalException(errMsg.str()));
644 auto data = reply.data();
645 if (data.setloglevel().has_exexceptionerr()) {
646 ::elt::mal::throw_exception(::hellomalif::zpb::ExceptionErrImpl(data.setloglevel().exexceptionerr()));
648 return (std::string)data.setloglevel().retval();
658 const ::elt::mal::Uri &uri,
659 const std::vector<std::shared_ptr<::elt::mal::rr::qos::QoS>> &standardQoS,
660 const ::elt::mal::Mal::Properties &malSpecificProperties,
661 const std::shared_ptr<::elt::mal::ZpbMal> &zpbMal)
662 : m_delegate(uri, standardQoS, malSpecificProperties, zpbMal), m_timeoutMs{} {
663 m_timeoutMs = ::elt::mal::util::getReplyTimeQoS(standardQoS);
666 std::shared_ptr<::elt::mal::Mal>
getMal()
const override {
667 return m_delegate.getMal();
671 auto future = m_delegate.
Init();
672 if (::elt::mal::future_status::ready
673 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
676 throw ::elt::mal::TimeoutException(
"timeout");
681 auto future = m_delegate.
Reset();
682 if (::elt::mal::future_status::ready
683 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
686 throw ::elt::mal::TimeoutException(
"timeout");
691 auto future = m_delegate.
Enable();
692 if (::elt::mal::future_status::ready
693 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
696 throw ::elt::mal::TimeoutException(
"timeout");
701 auto future = m_delegate.
Disable();
702 if (::elt::mal::future_status::ready
703 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
706 throw ::elt::mal::TimeoutException(
"timeout");
711 auto future = m_delegate.
GetState();
712 if (::elt::mal::future_status::ready
713 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
716 throw ::elt::mal::TimeoutException(
"timeout");
722 if (::elt::mal::future_status::ready
723 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
726 throw ::elt::mal::TimeoutException(
"timeout");
732 if (::elt::mal::future_status::ready
733 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
736 throw ::elt::mal::TimeoutException(
"timeout");
741 auto future = m_delegate.
Stop();
742 if (::elt::mal::future_status::ready
743 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
746 throw ::elt::mal::TimeoutException(
"timeout");
751 auto future = m_delegate.
Exit();
752 if (::elt::mal::future_status::ready
753 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
756 throw ::elt::mal::TimeoutException(
"timeout");
760 std::string
SetLogLevel(
const std::shared_ptr<::hellomalif::LogInfo>& info)
override {
762 if (::elt::mal::future_status::ready
763 == future.wait_for(boost::chrono::milliseconds(m_timeoutMs))) {
766 throw ::elt::mal::TimeoutException(
"timeout");
777 return m_delegate.asyncConnect();
781 return m_delegate.registerConnectionListener(listener);
793 ::elt::mal::rr::ZpbServer &zpbServer,
794 const std::shared_ptr<::hellomalif::AsyncStdCmds> &entity,
795 const std::shared_ptr<::elt::mal::Mal> &mal)
796 : m_zpbServer(zpbServer), m_entity(entity), m_mal(mal) {}
800 const std::string &
id,
const std::string &topicIdFrame,
801 const std::string &headerFrame,
802 const ::elt::mal::zpb::rr::ZpbHeader &header) {
803 if (!::elt::mal::util::getSocketHasRcvMore(socket)) {
804 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> unhandledExReplyHeader(
805 new ::elt::mal::zpb::rr::ReplyHeader());
807 unhandledExReplyHeader->set_code(::elt::mal::zpb::rr::CompletionCode::INVALID_ARGUMENT);
808 unhandledExReplyHeader->set_exceptionmessage(
"missing request frame");
810 sendReply(socket,
id, topicIdFrame, headerFrame, std::move(unhandledExReplyHeader),
811 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return>());
819 zmq::message_t msgFrame;
820 socket.recv(&msgFrame, 0);
822 if (!request.ParseFromArray(msgFrame.data(), msgFrame.size())) {
823 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> unhandledExReplyHeader(
824 new ::elt::mal::zpb::rr::ReplyHeader());
826 unhandledExReplyHeader->set_code(::elt::mal::zpb::rr::CompletionCode::INVALID_ARGUMENT);
827 unhandledExReplyHeader->set_exceptionmessage(
"Invalid Request: Request parsing failed");
829 sendReply(socket,
id, topicIdFrame, headerFrame, std::move(unhandledExReplyHeader),
830 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return>());
837 if (call->has_init()) {
838 auto span = std::shared_ptr<::opentracing::Span>();
839 if (header.has_trace()) {
840 auto spanUnique = ::elt::mal::util::tracing::createSpan(
841 "StdCmds::Init", header.trace(), ::opentracing::ext::span_kind_rpc_server);
842 span = std::move(spanUnique);
843 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
845 auto params_ = call->mutable_init();
847 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
848 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
850 ::elt::mal::rr::ServerContextProvider::setInstance<
851 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
853 m_entity->Init().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
854 (::elt::mal::future<std::string> f) {
855 if (!f.has_exception()) {
856 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
857 new ::generated::zpb::hellomalif::StdCmds_Return());
859 auto out = new ::generated::zpb::hellomalif::StdCmds_Init_Out();
861 out->set_retval(f.get());
863 data->set_allocated_init(out);
864 if (span) { span->Finish();}
868 ::elt::mal::rethrow_exception(f.get_exception_ptr());
871 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
875 auto out = new ::generated::zpb::hellomalif::StdCmds_Init_Out();
877 *out->mutable_exexceptionerr() = **&_ex;
879 data->set_allocated_init(out);
880 if (span) { span->Finish();}
883 if (span) { span->Finish();}
885 headerFrame, f.get_exception_ptr());
889 }
else if (call->has_reset()) {
890 auto span = std::shared_ptr<::opentracing::Span>();
891 if (header.has_trace()) {
892 auto spanUnique = ::elt::mal::util::tracing::createSpan(
893 "StdCmds::Reset", header.trace(), ::opentracing::ext::span_kind_rpc_server);
894 span = std::move(spanUnique);
895 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
897 auto params_ = call->mutable_reset();
899 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
900 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
902 ::elt::mal::rr::ServerContextProvider::setInstance<
903 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
905 m_entity->Reset().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
906 (::elt::mal::future<std::string> f) {
907 if (!f.has_exception()) {
908 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
909 new ::generated::zpb::hellomalif::StdCmds_Return());
911 auto out = new ::generated::zpb::hellomalif::StdCmds_Reset_Out();
913 out->set_retval(f.get());
915 data->set_allocated_reset(out);
916 if (span) { span->Finish();}
920 ::elt::mal::rethrow_exception(f.get_exception_ptr());
923 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
927 auto out = new ::generated::zpb::hellomalif::StdCmds_Reset_Out();
929 *out->mutable_exexceptionerr() = **&_ex;
931 data->set_allocated_reset(out);
932 if (span) { span->Finish();}
935 if (span) { span->Finish();}
937 headerFrame, f.get_exception_ptr());
941 }
else if (call->has_enable()) {
942 auto span = std::shared_ptr<::opentracing::Span>();
943 if (header.has_trace()) {
944 auto spanUnique = ::elt::mal::util::tracing::createSpan(
945 "StdCmds::Enable", header.trace(), ::opentracing::ext::span_kind_rpc_server);
946 span = std::move(spanUnique);
947 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
949 auto params_ = call->mutable_enable();
951 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
952 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
954 ::elt::mal::rr::ServerContextProvider::setInstance<
955 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
957 m_entity->Enable().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
958 (::elt::mal::future<std::string> f) {
959 if (!f.has_exception()) {
960 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
961 new ::generated::zpb::hellomalif::StdCmds_Return());
963 auto out = new ::generated::zpb::hellomalif::StdCmds_Enable_Out();
965 out->set_retval(f.get());
967 data->set_allocated_enable(out);
968 if (span) { span->Finish();}
972 ::elt::mal::rethrow_exception(f.get_exception_ptr());
975 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
979 auto out = new ::generated::zpb::hellomalif::StdCmds_Enable_Out();
981 *out->mutable_exexceptionerr() = **&_ex;
983 data->set_allocated_enable(out);
984 if (span) { span->Finish();}
987 if (span) { span->Finish();}
989 headerFrame, f.get_exception_ptr());
993 }
else if (call->has_disable()) {
994 auto span = std::shared_ptr<::opentracing::Span>();
995 if (header.has_trace()) {
996 auto spanUnique = ::elt::mal::util::tracing::createSpan(
997 "StdCmds::Disable", header.trace(), ::opentracing::ext::span_kind_rpc_server);
998 span = std::move(spanUnique);
999 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1001 auto params_ = call->mutable_disable();
1003 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1004 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1006 ::elt::mal::rr::ServerContextProvider::setInstance<
1007 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1009 m_entity->Disable().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1010 (::elt::mal::future<std::string> f) {
1011 if (!f.has_exception()) {
1012 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1013 new ::generated::zpb::hellomalif::StdCmds_Return());
1015 auto out = new ::generated::zpb::hellomalif::StdCmds_Disable_Out();
1017 out->set_retval(f.get());
1019 data->set_allocated_disable(out);
1020 if (span) { span->Finish();}
1024 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1027 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1031 auto out = new ::generated::zpb::hellomalif::StdCmds_Disable_Out();
1033 *out->mutable_exexceptionerr() = **&_ex;
1035 data->set_allocated_disable(out);
1036 if (span) { span->Finish();}
1039 if (span) { span->Finish();}
1041 headerFrame, f.get_exception_ptr());
1045 }
else if (call->has_getstate()) {
1046 auto span = std::shared_ptr<::opentracing::Span>();
1047 if (header.has_trace()) {
1048 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1049 "StdCmds::GetState", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1050 span = std::move(spanUnique);
1051 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1053 auto params_ = call->mutable_getstate();
1055 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1056 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1058 ::elt::mal::rr::ServerContextProvider::setInstance<
1059 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1061 m_entity->GetState().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1062 (::elt::mal::future<std::string> f) {
1063 if (!f.has_exception()) {
1064 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1065 new ::generated::zpb::hellomalif::StdCmds_Return());
1067 auto out = new ::generated::zpb::hellomalif::StdCmds_GetState_Out();
1069 out->set_retval(f.get());
1071 data->set_allocated_getstate(out);
1072 if (span) { span->Finish();}
1076 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1079 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1083 auto out = new ::generated::zpb::hellomalif::StdCmds_GetState_Out();
1085 *out->mutable_exexceptionerr() = **&_ex;
1087 data->set_allocated_getstate(out);
1088 if (span) { span->Finish();}
1091 if (span) { span->Finish();}
1093 headerFrame, f.get_exception_ptr());
1097 }
else if (call->has_getstatus()) {
1098 auto span = std::shared_ptr<::opentracing::Span>();
1099 if (header.has_trace()) {
1100 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1101 "StdCmds::GetStatus", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1102 span = std::move(spanUnique);
1103 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1105 auto params_ = call->mutable_getstatus();
1107 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1108 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1110 ::elt::mal::rr::ServerContextProvider::setInstance<
1111 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1113 m_entity->GetStatus().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1114 (::elt::mal::future<std::string> f) {
1115 if (!f.has_exception()) {
1116 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1117 new ::generated::zpb::hellomalif::StdCmds_Return());
1119 auto out = new ::generated::zpb::hellomalif::StdCmds_GetStatus_Out();
1121 out->set_retval(f.get());
1123 data->set_allocated_getstatus(out);
1124 if (span) { span->Finish();}
1128 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1131 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1135 auto out = new ::generated::zpb::hellomalif::StdCmds_GetStatus_Out();
1137 *out->mutable_exexceptionerr() = **&_ex;
1139 data->set_allocated_getstatus(out);
1140 if (span) { span->Finish();}
1143 if (span) { span->Finish();}
1145 headerFrame, f.get_exception_ptr());
1149 }
else if (call->has_getversion()) {
1150 auto span = std::shared_ptr<::opentracing::Span>();
1151 if (header.has_trace()) {
1152 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1153 "StdCmds::GetVersion", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1154 span = std::move(spanUnique);
1155 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1157 auto params_ = call->mutable_getversion();
1159 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1160 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1162 ::elt::mal::rr::ServerContextProvider::setInstance<
1163 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1165 m_entity->GetVersion().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1166 (::elt::mal::future<std::string> f) {
1167 if (!f.has_exception()) {
1168 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1169 new ::generated::zpb::hellomalif::StdCmds_Return());
1171 auto out = new ::generated::zpb::hellomalif::StdCmds_GetVersion_Out();
1173 out->set_retval(f.get());
1175 data->set_allocated_getversion(out);
1176 if (span) { span->Finish();}
1180 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1183 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1187 auto out = new ::generated::zpb::hellomalif::StdCmds_GetVersion_Out();
1189 *out->mutable_exexceptionerr() = **&_ex;
1191 data->set_allocated_getversion(out);
1192 if (span) { span->Finish();}
1195 if (span) { span->Finish();}
1197 headerFrame, f.get_exception_ptr());
1201 }
else if (call->has_stop()) {
1202 auto span = std::shared_ptr<::opentracing::Span>();
1203 if (header.has_trace()) {
1204 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1205 "StdCmds::Stop", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1206 span = std::move(spanUnique);
1207 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1209 auto params_ = call->mutable_stop();
1211 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1212 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1214 ::elt::mal::rr::ServerContextProvider::setInstance<
1215 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1217 m_entity->Stop().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1218 (::elt::mal::future<std::string> f) {
1219 if (!f.has_exception()) {
1220 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1221 new ::generated::zpb::hellomalif::StdCmds_Return());
1223 auto out = new ::generated::zpb::hellomalif::StdCmds_Stop_Out();
1225 out->set_retval(f.get());
1227 data->set_allocated_stop(out);
1228 if (span) { span->Finish();}
1232 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1235 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1239 auto out = new ::generated::zpb::hellomalif::StdCmds_Stop_Out();
1241 *out->mutable_exexceptionerr() = **&_ex;
1243 data->set_allocated_stop(out);
1244 if (span) { span->Finish();}
1247 if (span) { span->Finish();}
1249 headerFrame, f.get_exception_ptr());
1253 }
else if (call->has_exit()) {
1254 auto span = std::shared_ptr<::opentracing::Span>();
1255 if (header.has_trace()) {
1256 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1257 "StdCmds::Exit", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1258 span = std::move(spanUnique);
1259 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1261 auto params_ = call->mutable_exit();
1263 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1264 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1266 ::elt::mal::rr::ServerContextProvider::setInstance<
1267 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1269 m_entity->Exit().then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1270 (::elt::mal::future<std::string> f) {
1271 if (!f.has_exception()) {
1272 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1273 new ::generated::zpb::hellomalif::StdCmds_Return());
1275 auto out = new ::generated::zpb::hellomalif::StdCmds_Exit_Out();
1277 out->set_retval(f.get());
1279 data->set_allocated_exit(out);
1280 if (span) { span->Finish();}
1284 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1287 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1291 auto out = new ::generated::zpb::hellomalif::StdCmds_Exit_Out();
1293 *out->mutable_exexceptionerr() = **&_ex;
1295 data->set_allocated_exit(out);
1296 if (span) { span->Finish();}
1299 if (span) { span->Finish();}
1301 headerFrame, f.get_exception_ptr());
1305 }
else if (call->has_setloglevel()) {
1306 auto span = std::shared_ptr<::opentracing::Span>();
1307 if (header.has_trace()) {
1308 auto spanUnique = ::elt::mal::util::tracing::createSpan(
1309 "StdCmds::SetLogLevel", header.trace(), ::opentracing::ext::span_kind_rpc_server);
1310 span = std::move(spanUnique);
1311 ::elt::mal::util::tracing::ScopeManagerFactory::getScopeManager().activate(span);
1313 auto params_ = call->mutable_setloglevel();
1315 auto serverContext = std::unique_ptr<::elt::mal::rr::ServerContext<std::string>>(
1316 new ::elt::mal::rr::ServerContextZpb<std::string>(m_mal));
1318 ::elt::mal::rr::ServerContextProvider::setInstance<
1319 ::elt::mal::rr::ServerContext<std::string>>(serverContext);
1321 m_entity->SetLogLevel(std::shared_ptr< ::hellomalif::LogInfo>(new ::hellomalif::zpb::LogInfoImpl(params_->mutable_info()))).then(m_zpbServer.getExecutor(), [
this, &socket, id, topicIdFrame, headerFrame, params_, span]
1322 (::elt::mal::future<std::string> f) {
1323 if (!f.has_exception()) {
1324 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(
1325 new ::generated::zpb::hellomalif::StdCmds_Return());
1327 auto out = new ::generated::zpb::hellomalif::StdCmds_SetLogLevel_Out();
1329 out->set_retval(f.get());
1331 data->set_allocated_setloglevel(out);
1332 if (span) { span->Finish();}
1336 ::elt::mal::rethrow_exception(f.get_exception_ptr());
1339 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data(new ::generated::zpb::hellomalif::StdCmds_Return());
1343 auto out = new ::generated::zpb::hellomalif::StdCmds_SetLogLevel_Out();
1345 *out->mutable_exexceptionerr() = **&_ex;
1347 data->set_allocated_setloglevel(out);
1348 if (span) { span->Finish();}
1351 if (span) { span->Finish();}
1353 headerFrame, f.get_exception_ptr());
1358 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> unhandledExReplyHeader(
1359 new ::elt::mal::zpb::rr::ReplyHeader());
1361 unhandledExReplyHeader->set_code(::elt::mal::zpb::rr::CompletionCode::UNKNOWN_OPERATION);
1363 sendReply(socket,
id, topicIdFrame, headerFrame, std::move(unhandledExReplyHeader),
1364 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return>());
1369 zmq::socket_t &socket,
const std::string &
id,
const std::string &topicIdFrame,
1370 const std::string &headerFrame,
1371 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> replyHeader,
1372 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data) {
1381 auto replyStr = reply.SerializeAsString();
1383 m_zpbServer.sendReply(socket,
id, topicIdFrame, headerFrame, replyStr);
1387 zmq::socket_t &socket,
const std::string &
id,
1388 const std::string &topicIdFrame,
const std::string &headerFrame,
1389 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return> data) {
1390 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> completedReplyHeader(
1391 new ::elt::mal::zpb::rr::ReplyHeader());
1393 completedReplyHeader->set_code(::elt::mal::zpb::rr::CompletionCode::COMPLETED);
1395 sendReply(socket,
id, topicIdFrame, headerFrame,
1396 std::move(completedReplyHeader), std::move(data));
1400 zmq::socket_t &socket,
const std::string &
id,
const std::string &topicIdFrame,
1401 const std::string &headerFrame, ::elt::mal::exception_ptr exceptionPtr) {
1402 std::unique_ptr<::elt::mal::zpb::rr::ReplyHeader> unhandledExReplyHeader(
1403 new ::elt::mal::zpb::rr::ReplyHeader());
1405 unhandledExReplyHeader->set_code(::elt::mal::zpb::rr::CompletionCode::UNKNOWN_EXCEPTION);
1407 std::string exceptionMessage =
"Unknown";
1410 ::elt::mal::rethrow_exception(exceptionPtr);
1411 }
catch (std::exception &e) {
1412 exceptionMessage = e.what();
1415 unhandledExReplyHeader->set_exceptionmessage(exceptionMessage);
1417 sendReply(socket,
id, topicIdFrame, headerFrame, std::move(unhandledExReplyHeader),
1418 std::unique_ptr<::generated::zpb::hellomalif::StdCmds_Return>());
1422 ::elt::mal::rr::ZpbServer &m_zpbServer;
1423 std::shared_ptr<::hellomalif::AsyncStdCmds> m_entity;
1424 const std::shared_ptr<::elt::mal::Mal>& m_mal;