5 #ifndef CCF_MPTK_MESSAGE_BUS_HPP_
6 #define CCF_MPTK_MESSAGE_BUS_HPP_
15 #include <core/utils/base/base.hpp>
16 #include <core/utils/time/time.hpp>
35 m_count_mutex.lock(); {
37 } m_count_mutex.unlock();
41 m_acquire_mutex.lock(); {
43 while (m_count == 0) {
44 core::utils::time::Sleep(0.0001);
47 m_count_mutex.lock(); {
49 } m_count_mutex.unlock();
51 } m_acquire_mutex.unlock();
55 m_acquire_mutex.lock(); {
57 double start_time = core::utils::time::Time();
58 while ((m_count == 0) && ((core::utils::time::Time() - start_time) < time)) {
59 core::utils::time::Sleep(0.0001);
61 if ((core::utils::time::Time() - start_time) >= time) {
62 m_acquire_mutex.unlock();
66 m_count_mutex.lock(); {
68 } m_count_mutex.unlock();
70 } m_acquire_mutex.unlock();
76 std::mutex m_acquire_mutex;
77 std::mutex m_count_mutex;
119 const double timeout,
129 const double timeout,
136 const double timeout,
147 std::map<std::string, std::list<Message>> m_message_registry;
150 std::map<std::string, TmpCountingSem*> m_message_registry_sem;
153 std::map<std::string, std::list<Response>> m_response_registry;
156 std::map<std::string, TmpCountingSem*> m_response_registry_sem;
159 std::map<std::string, Response*> m_response_msg_id_registry;
161 std::vector<std::string> m_threads;
Definition: messageBus.hpp:23
TmpCountingSem()
Definition: messageBus.hpp:26
void release()
Definition: messageBus.hpp:34
bool try_acquire_for(const double time)
Definition: messageBus.hpp:54
~TmpCountingSem()
Definition: messageBus.hpp:30
void acquire()
Definition: messageBus.hpp:40
IFW CTD Multiprocessing Toolkit Message Bus.
Definition: messageBus.hpp:91
bool ReceiveMessage(const std::string &receiver_thread_name, const double timeout, bool &timed_out, Message &message)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:94
bool ReceiveResponseByMsgId(const std::string &msg_id, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:210
MessageBus & RegisterThread(const std::string &thread_name)
Register thread which will send/receive messages on the Message Bus.
Definition: messageBus.cpp:64
void SendMessage(const Message &message)
Send a message on the Message Bus.
Definition: messageBus.cpp:82
~MessageBus()
Definition: messageBus.cpp:35
std::string ToString() const
Generate ASCII output providing a status of the object (to the extend possible).
bool ThreadRegistered(const std::string &thread_name) const
Check if thread has been registered as participant in the Message Bus instance.
Definition: messageBus.cpp:56
void Reset()
Reset the internal message queues and other objects.
Definition: messageBus.cpp:45
MessageBus()
Definition: messageBus.cpp:27
void SendResponse(Response &response)
Send a response to a message received on the Message Bus.
Definition: messageBus.cpp:147
bool ReceiveResponse(const std::string &msg_sender_thread_name, const double timeout, bool &timed_out, Response &response)
Check for a message for this thread. Returns true if message available.
Definition: messageBus.cpp:160
IFW CTD Multiprocessing Toolkit Message class.
Definition: message.hpp:19
IFW CTD Multiprocessing Toolkit Response class.
Definition: response.hpp:20
Definition: manager.hpp:15
void SplitDblTime(const double time, int64_t &secs, int64_t &nano_secs)
Definition: messageBus.cpp:20