ifw-ccf  3.0.0-pre2
messageBus.hpp
Go to the documentation of this file.
1 
5 #ifndef CCF_MPTK_MESSAGE_BUS_HPP_
6 #define CCF_MPTK_MESSAGE_BUS_HPP_
7 
8 #include <string>
9 #include <queue>
10 #include <iostream>
11 
12 // TODO: Use C++20 std::counting_semaphore, when available.
13 //#include <boost/interprocess/sync/interprocess_semaphore.hpp>
14 
15 #include <core/utils/base/base.hpp>
16 #include <core/utils/time/time.hpp>
17 
18 #include <ccf/mptk/message.hpp>
19 #include <ccf/mptk/response.hpp>
20 
21 
22 // TODO: Dummy implementation. Should use C++20 counting semaphore.
24  public:
25 
27  m_count = 0;
28  };
29 
31  // Release all blocked on the semaphore?
32  };
33 
34  void release() {
35  m_count_mutex.lock(); {
36  m_count++;
37  } m_count_mutex.unlock();
38  };
39 
40  void acquire() {
41  m_acquire_mutex.lock(); {
42 
43  while (m_count == 0) {
44  core::utils::time::Sleep(0.0001);
45  }
46 
47  m_count_mutex.lock(); {
48  m_count--;
49  } m_count_mutex.unlock();
50 
51  } m_acquire_mutex.unlock();
52  };
53 
54  bool try_acquire_for(const double time) {
55  m_acquire_mutex.lock(); {
56 
57  double start_time = core::utils::time::Time();
58  while ((m_count == 0) && ((core::utils::time::Time() - start_time) < time)) {
59  core::utils::time::Sleep(0.0001);
60  }
61  if ((core::utils::time::Time() - start_time) >= time) {
62  m_acquire_mutex.unlock();
63  return false;
64  }
65 
66  m_count_mutex.lock(); {
67  m_count--;
68  } m_count_mutex.unlock();
69 
70  } m_acquire_mutex.unlock();
71 
72  return true;
73  };
74 
75  private:
76  std::mutex m_acquire_mutex;
77  std::mutex m_count_mutex;
78  int32_t m_count;
79 };
80 
81 
82 namespace ccf::mptk {
83 
84  void SplitDblTime(const double time,
85  int64_t& secs,
86  int64_t& nano_secs);
87 
91  class MessageBus {
92  public:
93 
94  MessageBus();
95 
96  ~MessageBus();
97 
99  void Reset();
100 
102  MessageBus& RegisterThread(const std::string& thread_name);
103 
105  bool ThreadRegistered(const std::string& thread_name) const;
106 
108  void SendMessage(const Message& message);
109 
118  bool ReceiveMessage(const std::string& receiver_thread_name,
119  const double timeout,
120  bool& timed_out,
121  Message& message);
122 
124  void SendResponse(Response& response);
125 
128  bool ReceiveResponse(const std::string& msg_sender_thread_name,
129  const double timeout,
130  bool& timed_out,
131  Response& response);
132 
135  bool ReceiveResponseByMsgId(const std::string& msg_id,
136  const double timeout,
137  bool& timed_out,
138  Response& response);
139 
141  std::string ToString() const;
142 
143  private:
144  std::string m_id; // Message Bus ID.
145 
146  // Message Receiver Thread ID to message objects list.
147  std::map<std::string, std::list<Message>> m_message_registry;
148  // Message Receiver Registry counting semaphores.
149  // boost: std::map<std::string, boost::interprocess::interprocess_semaphore*> m_message_registry_sem;
150  std::map<std::string, TmpCountingSem*> m_message_registry_sem;
151 
152  // Response receiver (= sender of msg) Thread ID to response objects list.
153  std::map<std::string, std::list<Response>> m_response_registry;
154  // Response Receiver Registry counting semaphores.
155  // boost: std::map<std::string, boost::interprocess::interprocess_semaphore*> m_response_registry_sem;
156  std::map<std::string, TmpCountingSem*> m_response_registry_sem;
157 
158  // Message ID to response objects list.
159  std::map<std::string, Response*> m_response_msg_id_registry;
160 
161  std::vector<std::string> m_threads;
162  };
163 
164 }
165 
166 #endif // CCF_MPTK_MESSAGE_BUS_HPP_
Definition: messageBus.hpp:23
TmpCountingSem()
Definition: messageBus.hpp:26
void release()
Definition: messageBus.hpp:34
bool try_acquire_for(const double time)
Definition: messageBus.hpp:54
~TmpCountingSem()
Definition: messageBus.hpp:30
void acquire()
Definition: messageBus.hpp:40
IFW CTD Multiprocessing Toolkit Message Bus.
Definition: messageBus.hpp:91
bool ReceiveMessage(const std::string &receiver_thread_name, const double timeout, bool &timed_out, Message &message)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:94
bool ReceiveResponseByMsgId(const std::string &msg_id, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:210
MessageBus & RegisterThread(const std::string &thread_name)
Register thread which will send/receive messages on the Message Bus.
Definition: messageBus.cpp:64
void SendMessage(const Message &message)
Send a message on the Message Bus.
Definition: messageBus.cpp:82
~MessageBus()
Definition: messageBus.cpp:35
std::string ToString() const
Generate ASCII output providing a status of the object (to the extend possible).
bool ThreadRegistered(const std::string &thread_name) const
Check if thread has been registered as participant in the Message Bus instance.
Definition: messageBus.cpp:56
void Reset()
Reset the internal message queues and other objects.
Definition: messageBus.cpp:45
MessageBus()
Definition: messageBus.cpp:27
void SendResponse(Response &response)
Send a response to a message received on the Message Bus.
Definition: messageBus.cpp:147
bool ReceiveResponse(const std::string &msg_sender_thread_name, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:160
IFW CTD Multiprocessing Toolkit Message class.
Definition: message.hpp:19
IFW CTD Multiprocessing Toolkit Response class.
Definition: response.hpp:20
Definition: manager.hpp:15
void SplitDblTime(const double time, int64_t &secs, int64_t &nano_secs)
Definition: messageBus.cpp:20