ifw-ccf  3.0.0-pre2
application.hpp
Go to the documentation of this file.
1 
5 #ifndef CCF_CONTROL_APPLICATION_HPP_
6 #define CCF_CONTROL_APPLICATION_HPP_
7 
8 #include <mal/Mal.hpp>
9 
10 #include <rad/mal/replier.hpp>
11 #include <rad/dbAdapterRedis.hpp>
12 
13 #include <Recif.hpp>
14 
15 #include <ccf/common/base.hpp>
19 #include <ccf/common/appBase.hpp>
20 #include <ccf/control/config.hpp>
33 
34 namespace ccf::control {
35 
37  // @{
38  const std::string MON_THR_TAG = "MonThr";
39  const std::string ACQ_THR_TAG = "AcqThr";
40  const std::string INPUT_Q_TAG = "InputQueue";
41  const std::string OUTPUT_Q_TAG = "OutputQueue::{}";
42  // @}
43 
45  // @{
46  const std::string THREAD_SIG_TERM = "SigTerm";
47  const std::string THREAD_SIG_SETUP = "SigSetup";
48  const std::string THREAD_SIG_NEW_DATA = "SigNewData";
49  const std::string THREAD_SIG_PUBLISH = "SigPublish";
50  const std::string THREAD_SIG_START = "SigStart";
51  // @}
52 
53  //const std::string THREAD_SYNC_ACK_OK = "SigAckOk";
54  //const std::string THREAD_SYNC_ACK_ERROR = "SigAckError";
55  //const std::string THREAD_SYNC_UNKNOWN = "SigUnknown";
56 
58  // @{
59  const std::string ACTIVE_REC_MUTEX = "ActiveRecMutex";
60  const std::string REC_HISTORY_MUTEX = "RecHistoryMutex";
61  const std::string MUTEX_REC_WAIT_REQUESTS = "RecWaitRequests";
62  // @}
63 
65  template <typename T>
66  void CcfRequestRejectHandler(const rad::AnyEvent& event, const std::string& state) {
67  LOG4CPLUS_TRACE_METHOD(Logger(), __PRETTY_FUNCTION__);
68  auto req = rad::GetPayloadNothrow<T>(event);
69  if (req == nullptr) {
70  LOG4CPLUS_ERROR(Logger(), event.GetId() << " has no associated request!");
71  return;
72  }
73  req->SetException(elt::mal::MalException("Request rejected in state " + state));
74  }
75 
81  public:
82 
85 
87  static Application& Instance();
88 
89  virtual ~Application();
90 
92  elt::mal::Mal& GetMal();
93 
95  int Execute(int argc, char* argv[]);
96 
99  void CreateThreads();
100 
102  void LoadInitSetup();
103 
105  void HandleSetup();
106 
109  void Dismantle();
110 
112  bool GetAcquisitionActive();
113 
115  void StartAcquistion();
116 
119  void StopAcquistion();
120 
122  void CheckStopAcquistion();
123 
125  void IncrementFrameCounter();
126 
128  std::string GetState();
129 
131  void SetRequestStatus(const std::string& request,
132  const std::string& status);
133 
135  std::string GetRequestStatus(const std::string& request) const;
136 
138  std::string ListAdapters();
139 
141  template <class COM_ADPT_TYPE, class SIM_COM_ADPT_TYPE>
142  void RegisterComAdapters(const COM_ADPT_TYPE& com_adapter_factory_obj,
143  const SIM_COM_ADPT_TYPE& sim_com_adapter_factory_obj) {
144  LOG4CPLUS_TRACE_METHOD(Logger(), __PRETTY_FUNCTION__);
145  // if (m_com_adapter != nullptr) {
146  // delete m_com_adapter;
147  // }
148  // if (m_sim_com_adapter != nullptr) {
149  // delete m_sim_com_adapter;
150  // }
151  m_com_adapter = std::make_unique<COM_ADPT_TYPE>();
152  ccf::AssertPtr(m_com_adapter.get(), "Communication Adapter", CCFLOC);
153  m_sim_com_adapter = std::make_unique<SIM_COM_ADPT_TYPE>();
154  ccf::AssertPtr(m_sim_com_adapter.get(), "Simulation Communication Adapter", CCFLOC);
155  }
156 
160 
162  void ConnectDevice();
163 
165  void DisconnectDevice();
166 
168  const std::vector<std::string>& GetProcThreadNames() const;
169 
171  const std::vector<std::string>& GetPubThreadNames(const std::string& proc_thread_name) const;
172 
174  const std::list<PubThread*>& GetPubThreads() const;
175 
177  bool IsProcThread(const std::string& thread_name);
178 
180  bool IsPubThread(const std::string& thread_name);
181 
182  Application(const Application&) = delete;
183  Application& operator=(const Application&) = delete;
184 
186  // Recording handling
188 
190  void UpdateRecordingStatus();
191 
193  void SetActiveRecStatusObj(std::shared_ptr<recif::RecStatus>& active_rec);
194 
196  void UpdateActiveRecStatusObj(const ccf::common::PublisherStatus& sum_rec_pub_status);
197 
199  void UpdateRecStatusInDb();
200 
202  void GetActiveRecStatus(std::shared_ptr<recif::RecStatus>& active_rec) const;
203 
206  std::shared_ptr<recif::RecStatus>& active_rec);
207 
209  void SetRecordingStatus(const ccf::PubStatus status);
210 
212  void GetRecStatus(const std::string& rec_id,
213  std::shared_ptr<recif::RecStatus>& active_rec) const;
214 
216  void GetMostRecentRecStatus(std::shared_ptr<recif::RecStatus>& active_rec) const;
217 
219  void AddRecWaitRequest(std::shared_ptr<RecWaitRequest>& req_wait_req);
220 
222  void CheckRecWaitRequests(const bool recording_completed=false);
223 
225  void CleanUpRecWaitRequests();
227 
228  private:
229  Application();
230 
231  int8_t m_simulation;
232  std::unique_ptr<rad::cii::Replier> m_mal_replier;
233  std::shared_ptr<elt::mal::Mal> m_mal;
234  std::unique_ptr<DataContext> m_data_ctx;
235  ActionMgr m_action_mgr;
236  //StdActions* m_actions_std;
237  bool m_dismantle_invoked;
238 
239  std::map<std::string, std::string> m_request_status_map;
240 
241  std::unique_ptr<ccf::common::ComAdptBase> m_com_adapter;
242  std::unique_ptr<ccf::common::ComAdptBase> m_sim_com_adapter;
243 
244  std::map<std::string, std::shared_ptr<ccf::common::FrameQueue>> m_frame_queues;
245 
246  std::vector<std::string> m_proc_thread_names;
247  std::map<std::string, std::vector<std::string>> m_proc_to_pub_thread_names;
248 
249  std::map<std::string, bool> m_pub_thread_names;
250 
251  // For easy access. The threads are handled by the MPTK.
252  std::list<PubThread*> m_pub_threads;
253 
254  // Acquisition control.
255  ccf::ExpoMode m_acq_setup_expo_mode;
256  int64_t m_acq_setup_nb_of_frames;
257  int64_t m_acq_status_frame_count;
258 
259  // Recording session control.
260  std::shared_ptr<recif::RecStatus> m_active_recording;
261  int64_t m_exp_nb_of_frames;
262  int32_t m_max_sz_rec_history;
263  int32_t m_rec_history_exp;
264  // Maps "<rec id>" into the set of associated Recording Sessions.
265  std::map<std::string, std::shared_ptr<recif::RecStatus>> m_rec_history;
266  std::vector<std::string> m_rec_id_history;
267 
268  std::map<std::string, std::string> m_file_stored_to_pub_id;
269 
270  // Pending ReqWait Requests.
271  std::map<std::string, std::shared_ptr<RecWaitRequest>> m_rec_wait_requests;
272 };
273 
274 inline Application& App() {
275  return Application::Instance();
276 }
277 
278 } // namespace ccf::control
279 
280 #endif // CCFCONTROL_APPLICATION_HPP_
#define CCFLOC
Macro generating a location identifier: "<file>:<line>:<function>:<thread>".
Definition: base.hpp:441
Class to be used as parent for CCF application type of classes.
Definition: appBase.hpp:25
Class to be used as parent for CCF Communication Adapters.
Definition: comAdptBase.hpp:31
Class used by a Publisher to handle its own publisher status.
Definition: pubBase.hpp:32
Responsible for the life-cycle management of actions and activities.
Definition: actionMgr.hpp:19
Implements the core CCF Control Application.
Definition: application.hpp:80
void UpdateRecordingStatus()
Disable assignment operator.
Definition: application.cpp:572
bool GetAcquisitionActive()
Returns true if image acquisition is active.
Definition: application.cpp:540
virtual ~Application()
Definition: application.cpp:55
void LoadInitSetup()
Load the Initialisation Setup specified in the configuration and install it.
Definition: application.cpp:516
std::string GetRequestStatus(const std::string &request) const
Get the status of the given request.
Definition: application.cpp:912
std::string GetState()
Return the current state (status) of the SCXML state machine.
Definition: application.cpp:567
void CheckStopAcquistion()
Check if the conditions are met to stop an ongoing acqusition session.
Definition: application.cpp:694
static Application * s_app_instance
Singleton instance.
Definition: application.hpp:84
Application & operator=(const Application &)=delete
Disable copy constructor.
void CreateThreads()
Definition: application.cpp:371
void SetActiveRecStatusObj(std::shared_ptr< recif::RecStatus > &active_rec)
Set the Active Recording Status Object.
Definition: application.cpp:802
void StartAcquistion()
Instruct the system to start an acquisition (finite or infnite).
Definition: application.cpp:665
void DisconnectDevice()
Disconnect from the associated device.
Definition: application.cpp:339
const std::list< PubThread * > & GetPubThreads() const
Get the IDs of the Publisher Threads running.
Definition: application.cpp:562
ccf::common::ComAdptBase & GetComAdapter()
Definition: application.cpp:503
void StopAcquistion()
Definition: application.cpp:711
int Execute(int argc, char *argv[])
Initialise and execute the application.
Definition: application.cpp:66
bool IsPubThread(const std::string &thread_name)
Returns bool if the thread is a Publisher Thread.
Definition: application.cpp:927
void SetRequestStatus(const std::string &request, const std::string &status)
Set status of the given request.
Definition: application.cpp:906
void RegisterComAdapters(const COM_ADPT_TYPE &com_adapter_factory_obj, const SIM_COM_ADPT_TYPE &sim_com_adapter_factory_obj)
Method to allocate and register the Communication Adapters.
Definition: application.hpp:142
void HandleSetup()
Handle the setup of the application.
Definition: application.cpp:524
void GetActiveRecStatus(std::shared_ptr< recif::RecStatus > &active_rec) const
Get the currently active Recording status.
Definition: application.cpp:731
bool IsProcThread(const std::string &thread_name)
Returns bool if the thread is a Processing Thread.
Definition: application.cpp:922
void Dismantle()
Definition: application.cpp:233
std::string ListAdapters()
List the available adapters registered in this executable.
Definition: application.cpp:355
void GetMostRecentRecStatus(std::shared_ptr< recif::RecStatus > &active_rec) const
Get the information about the most recent Recording (active or in the history).
Definition: application.cpp:778
const std::vector< std::string > & GetProcThreadNames() const
Get the names of the Processing Threads running.
Definition: application.cpp:545
elt::mal::Mal & GetMal()
Return reference to the MAL (Singleton) instance.
Definition: application.cpp:60
void AddRecWaitRequest(std::shared_ptr< RecWaitRequest > &req_wait_req)
Add a ReqWaitRequest in the internal registry.
Definition: application.cpp:839
void UpdateActiveRecStatusObj(const ccf::common::PublisherStatus &sum_rec_pub_status)
Update the Active Recording Status Object in memory and in the OLDB.
Definition: application.cpp:611
void GetRecStatus(const std::string &rec_id, std::shared_ptr< recif::RecStatus > &active_rec) const
Check if a Recording with the given ID is found and returns its status.
Definition: application.cpp:739
void CheckRecWaitRequests(const bool recording_completed=false)
Check if there are pending requests from RecWait Requests and send response if needed.
Definition: application.cpp:847
void CleanUpRecWaitRequests()
Check if there are RecWait Requests that can be removed from the queue.
Definition: application.cpp:885
void IncrementFrameCounter()
Increment frame counter, counting the number of complete frames received.
Definition: application.cpp:726
static Application & Instance()
Return reference to unique instance of the application class.
Definition: application.cpp:34
const std::vector< std::string > & GetPubThreadNames(const std::string &proc_thread_name) const
Get the names of the Publisher Threads running for a specific Processing Thread.
Definition: application.cpp:551
void MoveActiveRecStatusToHistory(const ccf::PubStatus status, std::shared_ptr< recif::RecStatus > &active_rec)
Move the Active Rec Status object to the Rec Status History.
Definition: application.cpp:811
void ConnectDevice()
Connect to the associated device.
Definition: application.cpp:321
Application(const Application &)=delete
void UpdateRecStatusInDb()
Update the status of the current Recording Session in the DB.
Definition: application.cpp:638
void SetRecordingStatus(const ccf::PubStatus status)
Set the status of the an active Recording Session (Active Recording Object).
Definition: application.cpp:783
Definition: acqThread.cpp:10
const std::string THREAD_SIG_PUBLISH
Definition: application.hpp:49
const std::string OUTPUT_Q_TAG
Definition: application.hpp:41
void CcfRequestRejectHandler(const rad::AnyEvent &event, const std::string &state)
Template function RAD request rejection check/handler.
Definition: application.hpp:66
const std::string ACTIVE_REC_MUTEX
Definition: application.hpp:59
const std::string INPUT_Q_TAG
Definition: application.hpp:40
const std::string REC_HISTORY_MUTEX
Definition: application.hpp:60
const std::string THREAD_SIG_START
Definition: application.hpp:50
const std::string MON_THR_TAG
Definition: application.hpp:38
const std::string THREAD_SIG_TERM
Definition: application.hpp:46
const std::string ACQ_THR_TAG
Definition: application.hpp:39
const std::string THREAD_SIG_SETUP
Definition: application.hpp:47
Application & App()
Definition: application.hpp:274
const std::string THREAD_SIG_NEW_DATA
Definition: application.hpp:48
const std::string MUTEX_REC_WAIT_REQUESTS
Definition: application.hpp:61
void AssertPtr(const void *ptr, const std::string &object, const std::string &location)
Check that pointer is not nullptr and raise rad::exception in case it is.
Definition: base.cpp:53
log4cplus::Logger & Logger()
Definition: base.cpp:9
PubStatus
Defines the various possible states of a Data Publisher.
Definition: base.hpp:271
ExpoMode
Exposure modes.
Definition: base.hpp:299