ifw-ccf  2.0.0
messageBus.hpp
Go to the documentation of this file.
1 
5 #ifndef CCF_MPTK_MESSAGE_BUS_HPP_
6 #define CCF_MPTK_MESSAGE_BUS_HPP_
7 
8 #include <string>
9 #include <queue>
10 #include <iostream>
11 
12 // TODO: Use C++20 std::counting_semaphore, when available.
13 #include <boost/interprocess/sync/interprocess_semaphore.hpp>
14 
15 #include <core/utils/base/base.hpp>
16 
17 #include <ccf/mptk/message.hpp>
18 #include <ccf/mptk/response.hpp>
19 
20 
21 namespace ccf::mptk {
22 
23  void SplitDblTime(const double time,
24  int64_t& secs,
25  int64_t& nano_secs);
26 
30  class MessageBus {
31  public:
32 
33  MessageBus();
34 
35  ~MessageBus();
36 
38  void Reset();
39 
41  MessageBus& RegisterThread(const std::string& thread_id);
42 
44  bool ThreadRegistered(const std::string& thread_id) const;
45 
47  void SendMessage(const Message& message);
48 
57  bool ReceiveMessage(const std::string& receiver_thread_id,
58  const double timeout,
59  bool& timed_out,
60  Message& message);
61 
63  void SendResponse(Response& response);
64 
67  bool ReceiveResponse(const std::string& msg_sender_thread_id,
68  const double timeout,
69  bool& timed_out,
70  Response& response);
71 
74  bool ReceiveResponseByMsgId(const std::string& msg_id,
75  const double timeout,
76  bool& timed_out,
77  Response& response);
78 
80  std::string ToString() const;
81 
82  private:
83  std::string m_id; // Message Bus ID.
84 
85  // Message Receiver Thread ID to message objects list.
86  std::map<std::string, std::list<Message>> m_message_registry;
87  // Message Receiver Registry counting semaphores.
88  std::map<std::string, boost::interprocess::interprocess_semaphore*> m_message_registry_sem;
89 
90  // Response receiver (= sender of msg) Thread ID to response objects list.
91  std::map<std::string, std::list<Response>> m_response_registry;
92  // Response Receiver Registry counting semaphores.
93  std::map<std::string, boost::interprocess::interprocess_semaphore*> m_response_registry_sem;
94 
95  // Message ID to response objects list.
96  std::map<std::string, Response*> m_response_msg_id_registry;
97 
98  std::vector<std::string> m_threads;
99  };
100 
101 }
102 
103 #endif // CCF_MPTK_MESSAGE_BUS_HPP_
ccf::mptk::MessageBus::RegisterThread
MessageBus & RegisterThread(const std::string &thread_id)
Register thread which will send/receive messages on the Message Bus.
Definition: messageBus.cpp:64
ccf::mptk
Definition: manager.hpp:15
message.hpp
ccf::mptk::MessageBus::MessageBus
MessageBus()
Definition: messageBus.cpp:27
ccf::mptk::MessageBus::ReceiveResponseByMsgId
bool ReceiveResponseByMsgId(const std::string &msg_id, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:206
ccf::mptk::MessageBus::~MessageBus
~MessageBus()
Definition: messageBus.cpp:35
ccf::mptk::SplitDblTime
void SplitDblTime(const double time, int64_t &secs, int64_t &nano_secs)
Definition: messageBus.cpp:20
ccf::mptk::MessageBus::SendResponse
void SendResponse(Response &response)
Send a response to a message received on the Message Bus.
Definition: messageBus.cpp:144
response.hpp
ccf::mptk::MessageBus::Reset
void Reset()
Reset the internal message queues and other objects.
Definition: messageBus.cpp:45
ccf::mptk::MessageBus
IFW CTD Multiprocessing Toolkit Message Bus.
Definition: messageBus.hpp:30
ccf::mptk::Message
IFW CTD Multiprocessing Toolkit Message class.
Definition: message.hpp:19
ccf::mptk::Response
IFW CTD Multiprocessing Toolkit Response class.
Definition: response.hpp:20
ccf::mptk::MessageBus::ReceiveResponse
bool ReceiveResponse(const std::string &msg_sender_thread_id, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:155
ccf::mptk::MessageBus::SendMessage
void SendMessage(const Message &message)
Send a message on the Message Bus.
Definition: messageBus.cpp:80
ccf::mptk::MessageBus::ToString
std::string ToString() const
Generate ASCII output providing a status of the object (to the extend possible).
ccf::mptk::MessageBus::ThreadRegistered
bool ThreadRegistered(const std::string &thread_id) const
Check if thread has been registered as participant in the Message Bus instance.
Definition: messageBus.cpp:56
ccf::mptk::MessageBus::ReceiveMessage
bool ReceiveMessage(const std::string &receiver_thread_id, const double timeout, bool &timed_out, Message &message)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:91