ifw-ccf 5.0.2
Loading...
Searching...
No Matches
messageBus.hpp
Go to the documentation of this file.
1
5#ifndef CCF_MPTK_MESSAGE_BUS_HPP_
6#define CCF_MPTK_MESSAGE_BUS_HPP_
7
8#include <string>
9#include <queue>
10#include <iostream>
11
12// TODO: Use C++20 std::counting_semaphore, when available.
13//#include <boost/interprocess/sync/interprocess_semaphore.hpp>
14
15#include <ifw/core/utils/base/base.hpp>
16#include <ifw/core/utils/time/time.hpp>
17
20
21
22// TODO: Dummy implementation. Should use C++20 counting semaphore.
24 public:
25
27 m_count = 0;
28 };
29
31 // Release all blocked on the semaphore?
32 };
33
34 void release() {
35 m_count_mutex.lock(); {
36 m_count++;
37 } m_count_mutex.unlock();
38 };
39
40 void acquire() {
41 m_acquire_mutex.lock(); {
42
43 while (m_count == 0) {
44 ifw::core::utils::time::Sleep(0.0001);
45 }
46
47 m_count_mutex.lock(); {
48 m_count--;
49 } m_count_mutex.unlock();
50
51 } m_acquire_mutex.unlock();
52 };
53
54 bool try_acquire_for(const double time) {
55 m_acquire_mutex.lock(); {
56
57 double start_time = ifw::core::utils::time::Time();
58 while ((m_count == 0) && ((ifw::core::utils::time::Time() - start_time) < time)) {
59 ifw::core::utils::time::Sleep(0.0001);
60 }
61 if ((ifw::core::utils::time::Time() - start_time) >= time) {
62 m_acquire_mutex.unlock();
63 return false;
64 }
65
66 m_count_mutex.lock(); {
67 m_count--;
68 } m_count_mutex.unlock();
69
70 } m_acquire_mutex.unlock();
71
72 return true;
73 };
74
75 private:
76 std::mutex m_acquire_mutex;
77 std::mutex m_count_mutex;
78 int32_t m_count;
79};
80
81
82namespace ifw::ccf::mptk {
83
84 void SplitDblTime(const double time,
85 int64_t& secs,
86 int64_t& nano_secs);
87
91 class MessageBus {
92 public:
93
94 MessageBus();
95
97
99 void Reset();
100
102 MessageBus& RegisterThread(const std::string& thread_name);
103
105 bool ThreadRegistered(const std::string& thread_name) const;
106
108 void SendMessage(const Message& message);
109
118 bool ReceiveMessage(const std::string& receiver_thread_name,
119 const double timeout,
120 bool& timed_out,
121 Message& message);
122
124 void SendResponse(Response& response);
125
128 bool ReceiveResponse(const std::string& msg_sender_thread_name,
129 const double timeout,
130 bool& timed_out,
131 Response& response);
132
135 bool ReceiveResponseByMsgId(const std::string& msg_id,
136 const double timeout,
137 bool& timed_out,
138 Response& response);
139
141 std::string ToString() const;
142
143 private:
144 std::string m_id; // Message Bus ID.
145
146 // Message Receiver Thread ID to message objects list.
147 std::map<std::string, std::list<Message>> m_message_registry;
148 // Message Receiver Registry counting semaphores.
149 // boost: std::map<std::string, boost::interprocess::interprocess_semaphore*> m_message_registry_sem;
150 std::map<std::string, TmpCountingSem*> m_message_registry_sem;
151
152 // Response receiver (= sender of msg) Thread ID to response objects list.
153 std::map<std::string, std::list<Response>> m_response_registry;
154 // Response Receiver Registry counting semaphores.
155 // boost: std::map<std::string, boost::interprocess::interprocess_semaphore*> m_response_registry_sem;
156 std::map<std::string, TmpCountingSem*> m_response_registry_sem;
157
158 // Message ID to response objects list.
159 std::map<std::string, Response*> m_response_msg_id_registry;
160
161 std::vector<std::string> m_threads;
162 };
163
164}
165
166#endif // CCF_MPTK_MESSAGE_BUS_HPP_
Definition messageBus.hpp:23
TmpCountingSem()
Definition messageBus.hpp:26
void release()
Definition messageBus.hpp:34
bool try_acquire_for(const double time)
Definition messageBus.hpp:54
~TmpCountingSem()
Definition messageBus.hpp:30
void acquire()
Definition messageBus.hpp:40
IFW CTD Multiprocessing Toolkit Message Bus.
Definition messageBus.hpp:91
std::string ToString() const
Generate ASCII output providing a status of the object (to the extend possible).
void SendMessage(const Message &message)
Send a message on the Message Bus.
Definition messageBus.cpp:82
MessageBus()
Definition messageBus.cpp:27
bool ThreadRegistered(const std::string &thread_name) const
Check if thread has been registered as participant in the Message Bus instance.
Definition messageBus.cpp:56
~MessageBus()
Definition messageBus.cpp:35
void Reset()
Reset the internal message queues and other objects.
Definition messageBus.cpp:45
MessageBus & RegisterThread(const std::string &thread_name)
Register thread which will send/receive messages on the Message Bus.
Definition messageBus.cpp:64
bool ReceiveMessage(const std::string &receiver_thread_name, const double timeout, bool &timed_out, Message &message)
Check for a message for this thread. Returns true if message available.
Definition messageBus.cpp:100
bool ReceiveResponseByMsgId(const std::string &msg_id, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition messageBus.cpp:212
void SendResponse(Response &response)
Send a response to a message received on the Message Bus.
Definition messageBus.cpp:149
bool ReceiveResponse(const std::string &msg_sender_thread_name, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition messageBus.cpp:162
IFW CTD Multiprocessing Toolkit Message class.
Definition message.hpp:24
IFW CTD Multiprocessing Toolkit Response class.
Definition response.hpp:20
Definition manager.hpp:15
void SplitDblTime(const double time, int64_t &secs, int64_t &nano_secs)
Definition messageBus.cpp:20