ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testManager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief daq::ManagerImpl tests
7  */
8 #include <daq/manager.hpp>
9 #include <fmt/ostream.h>
10 #include <regex>
11 
12 #include "mock/daqController.hpp"
13 #include "mock/mockWorkspace.hpp"
14 #include "statusObserver.hpp"
15 #include "utils.hpp"
16 #include <gtest/gtest.h>
17 #include <log4cplus/loggingmacros.h>
18 
19 #include <daq/error/report.hpp>
20 
21 using namespace daq;
22 using namespace ::testing;
23 using namespace std::literals::string_view_literals;
24 using namespace std::chrono_literals;
25 
26 struct FixtureBase : ::testing::Test {
27  template <class T>
28  T ExpectNoException(boost::future<T>& f) {
29  if (!f.is_ready()) {
30  ADD_FAILURE() << "Future is not ready";
31  throw std::runtime_error("test failure");
32  }
33  try {
34  return f.get();
35  } catch (...) {
36  LOG4CPLUS_ERROR("test",
37  "Future contained exception\n"
38  << daq::error::NestedExceptionReporter(std::current_exception()));
39  throw;
40  }
41  }
42  ManagerParams m_params = {"INS"};
43 };
44 
45 /**
46  * @ingroup daq_ocm_libdaq_test
47  */
49 public:
50  void SetUp() override {
51  }
52  void TearDown() override {
53  }
54 };
55 
56 /**
57  * @ingroup daq_ocm_libdaq_test
58  */
59 class TestManagerImpl : public FixtureBase {
60 public:
62  : m_io_ctx()
63  , m_executor(m_io_ctx)
64  , m_event_log(std::make_shared<daq::ObservableEventLog>())
65  , m_manager(m_executor,
66  m_params,
67  m_workspace,
68  m_event_log,
69  m_daq_factory,
70  log4cplus::Logger::getInstance("test")) {
71  }
72  /**
73  * Creates manager and adds two data acquisitions.
74  */
75  void SetUp() override {
76  m_daq_id_1 = "daq1";
77  m_daq_id_2 = "daq2";
78  m_daq_ctx_1.id = m_daq_id_1;
79  m_daq_ctx_1.file_id = "fileid1";
80  m_daq_ctx_2.id = m_daq_id_2;
81  m_daq_ctx_2.file_id = "fileid2";
82 
83  m_daq1_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
84  m_daq2_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
85  m_daq1 = std::make_shared<DaqControllerMock>();
86  m_daq2 = std::make_shared<DaqControllerMock>();
87  m_daq_factory.ocm_mocks["daq1"] = m_daq1;
88  m_daq_factory.ocm_mocks["daq2"] = m_daq2;
89 
90  m_dpm_daq1 = std::make_shared<DaqControllerMock>();
91  m_dpm_daq2 = std::make_shared<DaqControllerMock>();
92  m_daq_factory.dpm_mocks["daq1"] = m_dpm_daq1;
93  m_daq_factory.dpm_mocks["daq2"] = m_dpm_daq2;
94 
95  EXPECT_CALL(*m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_1));
96  EXPECT_CALL(*m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_2));
97  EXPECT_CALL(*m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq1_status));
98  EXPECT_CALL(*m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq2_status));
99  EXPECT_CALL(Const(*m_daq1), GetStatus())
100  .Times(AnyNumber())
101  .WillRepeatedly(Return(m_daq1_status));
102  EXPECT_CALL(Const(*m_daq2), GetStatus())
103  .Times(AnyNumber())
104  .WillRepeatedly(Return(m_daq2_status));
105  EXPECT_CALL(Const(*m_daq1), GetContext())
106  .Times(AnyNumber())
107  .WillRepeatedly(ReturnRef(m_daq_ctx_1));
108 
109  m_daq1_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
110  m_daq1_dpm_status->SetState(State::Transferring);
111  m_daq2_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
112  m_daq2_dpm_status->SetState(State::Merging);
113 
114  EXPECT_CALL(*m_dpm_daq1, GetId()).WillRepeatedly(ReturnRef(m_daq_id_1));
115  EXPECT_CALL(*m_dpm_daq1, GetStatus())
116  .Times(AnyNumber())
117  .WillRepeatedly(Return(m_daq1_status));
118  EXPECT_CALL(Const(*m_dpm_daq1), GetStatus())
119  .Times(AnyNumber())
120  .WillRepeatedly(Return(m_daq1_status));
121  EXPECT_CALL(Const(*m_dpm_daq1), GetContext())
122  .Times(AnyNumber())
123  .WillRepeatedly(ReturnRef(m_daq_ctx_1));
124 
125  EXPECT_CALL(*m_dpm_daq2, GetId()).WillRepeatedly(ReturnRef(m_daq_id_2));
126  EXPECT_CALL(*m_dpm_daq2, GetStatus())
127  .Times(AnyNumber())
128  .WillRepeatedly(Return(m_daq2_dpm_status));
129  EXPECT_CALL(Const(*m_dpm_daq2), GetStatus())
130  .Times(AnyNumber())
131  .WillRepeatedly(Return(m_daq2_dpm_status));
132  EXPECT_CALL(Const(*m_dpm_daq2), GetContext())
133  .Times(AnyNumber())
134  .WillRepeatedly(ReturnRef(m_daq_ctx_2));
135  }
136 
137  auto StartDaq1() -> boost::future<State> {
138  EXPECT_CALL(*m_daq1, StartAsync())
139  .WillOnce(Return(ByMove(boost::make_ready_future<State>(m_daq1_status->GetState()))));
140  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
141  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1"))).Times(AtLeast(1));
142  EXPECT_CALL(m_workspace, StoreContext(_));
143  return m_manager.StartDaqAsync(m_daq_ctx_1);
144  }
145 
146  void TearDown() override {
147  // Run any pending handlers to allow mocks to expire without leaking
148  m_io_ctx.restart();
149  m_io_ctx.poll();
150 
151  m_daq_factory.ocm_mocks.clear();
152  m_daq_factory.dpm_mocks.clear();
153  }
154 
155  boost::asio::io_context m_io_ctx;
158  std::shared_ptr<daq::ObservableEventLog> m_event_log;
159  std::string m_daq_id_1;
160  std::string m_daq_id_2;
163  std::shared_ptr<DaqControllerMock> m_daq1;
164  std::shared_ptr<DaqControllerMock> m_daq2;
165  std::shared_ptr<DaqControllerMock> m_dpm_daq1;
166  std::shared_ptr<DaqControllerMock> m_dpm_daq2;
167  std::shared_ptr<ObservableStatus> m_daq1_status;
168  std::shared_ptr<ObservableStatus> m_daq2_status;
169  std::shared_ptr<ObservableStatus> m_daq1_dpm_status;
170  std::shared_ptr<ObservableStatus> m_daq2_dpm_status;
173 };
174 
175 TEST_F(TestManagerImplLifecycle, AddDaqNotifiesObserver) {
176  // Setup
177  boost::asio::io_context io_ctx;
178  rad::IoExecutor executor(io_ctx);
179  DaqControllerFactoryFake factory;
180  auto event_log = std::make_shared<ObservableEventLog>();
181  DaqContext daq_ctx;
182  MockWorkspace workspace;
183 
184  ManagerImpl mgr(
185  executor, m_params, workspace, event_log, factory, log4cplus::Logger::getInstance("test"));
186  auto id = std::string("id");
187  auto status = std::make_shared<ObservableStatus>(id, "fileid");
188  daq_ctx.id = id;
189  daq_ctx.file_id = id;
190  auto daq = std::make_shared<DaqControllerMock>();
191  factory.ocm_mocks[id] = daq;
192 
193  EXPECT_CALL(*daq, StartAsync())
194  .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
195  EXPECT_CALL(*daq, GetId()).WillRepeatedly(ReturnRef(id));
196  EXPECT_CALL(*daq, GetStatus()).WillRepeatedly(Return(status));
197  EXPECT_CALL(Const(*daq), GetStatus()).WillRepeatedly(Return(status));
198  EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
199 
200  EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
201  EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
202  EXPECT_CALL(workspace, StoreContext(daq_ctx));
203 
205  EXPECT_CALL(o, CallOperator(_));
206  mgr.GetStatusSignal().ConnectObserver(std::reference_wrapper(o));
207 
208  // Run
209  auto f = mgr.StartDaqAsync(daq_ctx);
210  io_ctx.poll();
211  ExpectNoException(f);
212 }
213 
214 TEST_F(TestManagerImplLifecycle, AwaitStateCompletesWithAbandonedManager) {
215  // Setup
216  boost::asio::io_context io_ctx;
217  rad::IoExecutor executor(io_ctx);
218  DaqControllerFactoryFake factory;
219  auto event_log = std::make_shared<ObservableEventLog>();
220  DaqContext daq_ctx;
221  MockWorkspace workspace;
222 
223  auto id = std::string("id");
224  daq_ctx.id = id;
225  daq_ctx.file_id = id;
226  auto status = std::make_shared<ObservableStatus>(id, "fileid");
227  auto daq = std::make_shared<DaqControllerMock>();
228  factory.ocm_mocks[id] = daq;
229  EXPECT_CALL(*daq, StartAsync())
230  .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
231  EXPECT_CALL(*daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(id));
232  EXPECT_CALL(*daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
233  EXPECT_CALL(Const(*daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
234  EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
235 
236  EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
237  EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
238  EXPECT_CALL(workspace, StoreContext(daq_ctx));
239 
240  boost::future<Result<Status>> res;
241  {
242  ManagerImpl mgr(executor,
243  m_params,
244  workspace,
245  event_log,
246  factory,
247  log4cplus::Logger::getInstance("test"));
248  mgr.StartDaqAsync(daq_ctx);
249 
250  // Initiate await that should be aborted when manager is destroyed
251  mgr.AwaitDaqStateAsync("id"sv, State::Acquiring, 5ms).swap(res);
252  ASSERT_FALSE(res.is_ready());
253  }
254 
255  // Run
256  MakeTestProgress(io_ctx, &res);
257  ASSERT_TRUE(res.is_ready());
258  EXPECT_THROW(res.get(), DaqOperationAborted);
259 }
260 
262  {
263  std::regex regex("INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
264  auto id = MakeIdCandidate("INSTRUMENT", 0);
265  EXPECT_TRUE(std::regex_match(id, regex))
266  << "Instrument ID should be truncated to 5 characters if too long";
267  }
268  {
269  std::regex regex("INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
270  auto id = m_manager.MakeDaqId();
271  EXPECT_TRUE(std::regex_match(id, regex));
272  }
273  {
274  auto id1 = MakeIdCandidate("INSTRUMENT", 0);
275  auto id2 = MakeIdCandidate("INSTRUMENT", 1);
276  EXPECT_NE(id1, id2) << "Adding jitter should have made the ID different";
277  }
278 }
279 
280 TEST_F(TestManagerImpl, StartDaqWithSameIdThrowsInvalidArgument) {
281  // Setup
282  EXPECT_CALL(*m_daq1, StartAsync())
283  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
284  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
285  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
286  EXPECT_CALL(m_workspace, StoreContext(_));
287 
288  m_manager.StartDaqAsync(m_daq_ctx_1);
289 
290  // Tes
291  auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
292  m_io_ctx.poll();
293  ASSERT_TRUE(f.is_ready());
294  EXPECT_THROW(f.get(), std::invalid_argument);
295 }
296 
297 TEST_F(TestManagerImpl, StartDaqAsyncStartsDaq) {
298  // Setup
299  EXPECT_CALL(*m_daq1, StartAsync())
300  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
301  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
302  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
303  EXPECT_CALL(m_workspace, StoreContext(_));
304 
305  // Run
306  auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
307  m_io_ctx.poll();
308  ASSERT_TRUE(fut.is_ready());
309  EXPECT_EQ(fut.get(), State::Starting);
310 }
311 
312 /**
313  * Tests the behaviour that if DaqController::StartAsync() fails it should automatically
314  * force-abort the acquisition. This is mainly done because putting that responsibility on the user
315  * is not productive.
316  *
317  * note: This behaviour can be made configurable.
318  */
319 TEST_F(TestManagerImpl, StartDaqAbortsIfDaqControllerFails) {
320  // Setup
321  EXPECT_CALL(*m_daq1, StartAsync())
322  .WillOnce(Return(
323  ByMove(boost::make_exceptional_future<State>(std::runtime_error("START FAILED")))));
324  boost::promise<daq::Status> abort_reply;
325  EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Tolerant))
326  .WillOnce(Return(ByMove(abort_reply.get_future())));
327  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
328  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
329  EXPECT_CALL(m_workspace, StoreContext(_));
330 
331  // Test
332  auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
333  m_io_ctx.poll();
334  ASSERT_FALSE(f.is_ready());
335 
336  // Send reply
337  abort_reply.set_exception(std::logic_error("ABORT ALSO FAILED"));
338 
339  MakeTestProgress(m_io_ctx, &f);
340  ASSERT_TRUE(f.is_ready());
341  EXPECT_THROW(f.get(), std::runtime_error);
342 }
343 
344 TEST_F(TestManagerImpl, StopNonexistingDaqThrowsInvalidArgument) {
345  auto fut = m_manager.StopDaqAsync("nonexistant-id"sv, daq::ErrorPolicy::Strict);
346  ASSERT_TRUE(fut.is_ready());
347  EXPECT_THROW(fut.get(), std::invalid_argument);
348 }
349 
350 TEST_F(TestManagerImpl, StopDaqAsyncStopsDaq) {
351  // Setup
352  StartDaq1();
353 
355  EXPECT_CALL(*m_daq1, StopAsync(daq::ErrorPolicy::Strict))
356  .WillOnce(Return(ByMove(
357  boost::make_ready_future<Status>(Status("daq1", "fileid", State::Stopped, false, t)))));
358 
359  // Run
360  auto fut = m_manager.StopDaqAsync("daq1"sv, daq::ErrorPolicy::Strict);
361  auto status = ExpectNoException(fut);
362  EXPECT_EQ(status.state, State::Stopped);
363  EXPECT_EQ(status.error, false);
364 }
365 
366 TEST_F(TestManagerImpl, AbortNonexistingDaqThrowsInvalidArgumentEvenIfTolerant) {
367  auto fut = m_manager.AbortDaqAsync("nonexistant-id"sv, ErrorPolicy::Tolerant);
368  ASSERT_TRUE(fut.is_ready());
369  EXPECT_THROW(fut.get(), std::invalid_argument);
370 }
371 
372 TEST_F(TestManagerImpl, AbortDaqAsyncAbortsDaq) {
373  // Setup
374  StartDaq1();
375 
376  auto reply_status = daq::Status("daq1", "fileid");
377  reply_status.state = State::AbortingAcquiring;
378  EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Strict))
379  .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
380 
381  // Run
382  auto fut = m_manager.AbortDaqAsync("daq1"sv, ErrorPolicy::Strict);
383  auto result = ExpectNoException(fut);
384  EXPECT_EQ(result.state, State::AbortingAcquiring);
385  EXPECT_FALSE(result.error);
386 }
387 
388 TEST_F(TestManagerImpl, UpdateKeywordsUpdatesKeywords) {
389  // Setup
390  StartDaq1();
391 
392  daq::fits::KeywordVector keywords = {
393  daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
394  daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
395  EXPECT_CALL(*m_daq1, UpdateKeywords(keywords));
396 
397  // Run
398  m_manager.UpdateKeywords("daq1"sv, keywords);
399 }
400 
401 TEST_F(TestManagerImpl, UpdateKeywordsForNonexistingDaqThrowsInvalidArgument) {
402  daq::fits::KeywordVector keywords = {
403  daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
404  daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
405  EXPECT_THROW(m_manager.UpdateKeywords("nonexistant-id"sv, keywords), std::invalid_argument);
406 }
407 
408 TEST_F(TestManagerImpl, GetStatus) {
409  // Setup
410  StartDaq1();
411 
412  // Run
413  auto status = m_manager.GetStatus("daq1"sv);
414  EXPECT_EQ(status.id, "daq1");
415  EXPECT_EQ(status.state, daq::State::NotStarted);
416  EXPECT_FALSE(status.error);
417 }
418 
419 TEST_F(TestManagerImpl, GetStatusThrowsIfDaqDoesNotExist) {
420  // Run
421  EXPECT_THROW(m_manager.GetStatus("nonexistant"sv), std::invalid_argument);
422 }
423 
424 TEST_F(TestManagerImpl, AwaitDaqStateReturnsReadyFutureIfConditionIsFulfilled) {
425  // Run
426  StartDaq1();
427  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::NotStarted, 10ms);
428  m_io_ctx.poll();
429 
430  ASSERT_TRUE(fut.is_ready());
431  auto [timeout, result] = fut.get();
432  EXPECT_FALSE(timeout);
433  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
434 }
435 
436 TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenConditionIsFulfilled) {
437  // Run
438  StartDaq1();
439  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 10ms);
440  ASSERT_FALSE(fut.is_ready());
441  m_daq1_status->SetState(daq::State::Acquiring);
442  MakeTestProgress(m_io_ctx, &fut);
443 
444  ASSERT_TRUE(fut.is_ready());
445  auto [timeout, result] = fut.get();
446  EXPECT_FALSE(timeout);
447  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
448 }
449 
450 TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenItTimesout) {
451  // Run
452  StartDaq1();
453  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 0ms);
454  ASSERT_FALSE(fut.is_ready());
455  MakeTestProgress(m_io_ctx, &fut);
456 
457  ASSERT_TRUE(fut.is_ready()) << "Timer should have triggered to make the future ready";
458  auto [timeout, result] = fut.get();
459  EXPECT_TRUE(timeout);
460  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
461 }
462 
463 TEST_F(TestManagerImpl, StoppedDaqTransitionsToNotScheduled) {
464  // Setup
465  StartDaq1();
466 
467  // Test
468  // Simulate completed Acquisition Phase
469  m_daq1_status->SetState(State::Stopped);
470 
471  EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
472  EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
473 
474  EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
475  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Scheduled))));
476  // When DAQ transitions to using DpmDaqController workspace is updated
477  EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
478  EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
479 
480  // Run pending completion handlers
481  m_io_ctx.poll();
482 
483  // After poll we expect Manager to have created DPM version of daqcontroller and transitioned it
484  // to NotScheduled.
485  EXPECT_EQ(m_daq1_status->GetState(), State::NotScheduled);
486 
487  auto daqs = m_manager.GetDaqControllers();
488  ASSERT_EQ(daqs.size(), 1u);
489  EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
490 }
491 
492 TEST_F(TestManagerImpl, RestoreFromWorkspaceAddsDaq) {
493  // Setup
494  // daq1 should create OCM daq controller
495  // daq2 should create DPM daq controller
496  //
497  // Set time to be not stale
498  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
499  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
500  m_daq1_status->SetState(State::Acquiring);
501  m_daq2_status->SetState(State::Merging);
502 
503  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
504 
505  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
506 
507  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
508  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
509  .WillOnce(Return(m_daq1_status->GetStatus()));
510 
511  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
512  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
513  .WillOnce(Return(m_daq2_status->GetStatus()));
514 
515  EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
516 
517  // Test
518  m_manager.RestoreFromWorkspace();
519 }
520 
521 TEST_F(TestManagerImpl, RestoreFromWorkspaceArchivesStaleDaqs) {
522  // Setup
523  // daq1 should be archived
524  // daq2 should create DPM daq controller
525  //
526  // Set time to be not stale
527  m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
528  m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
529 
530  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
531 
532  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
533 
534  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
535  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
536  .WillOnce(Return(m_daq1_status->GetStatus()));
537  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
538 
539  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
540  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
541  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
542  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
543 
544  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
545 
546  // Test
547  m_manager.RestoreFromWorkspace();
548 }
549 
550 TEST_F(TestManagerImpl, RestoreFromWorkspaceSkipsMissingDaq) {
551  // Setup
552  // daq1 should be archived
553  // daq2 should create DPM daq controller
554  //
555  // Set time to be not stale
556  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
557  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
558 
559  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
560  std::vector<std::string> to_store = {m_daq_ctx_2.id};
561 
562  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
563 
564  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
565  .WillOnce(Throw(std::runtime_error("ouch")));
566  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
567  .Times(Between(0, 1))
568  .WillRepeatedly(Return(m_daq1_status->GetStatus()));
569  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
570 
571  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
572  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
573  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
574 
575  EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
576 
577  // Test
578  m_manager.RestoreFromWorkspace();
579 }
580 
581 TEST_F(TestManagerImpl, RestoreFromWorkspaceIgnoresArchiveFailure) {
582  // Setup
583  // daq1 should be archived
584  // daq2 should create DPM daq controller
585  //
586  // Set time to be not stale
587  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
588  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
589  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
590  std::vector<std::string> to_store = {m_daq_ctx_2.id};
591 
592  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
593 
594  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
595  .WillOnce(Throw(std::runtime_error("ouch")));
596  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
597  .Times(Between(0, 1))
598  .WillRepeatedly(Return(m_daq1_status->GetStatus()));
599  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
600  .WillOnce(Throw(std::runtime_error("ouch")));
601 
602  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
603  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
604  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
605 
606  EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
607 
608  // Test
609  m_manager.RestoreFromWorkspace();
610 }
611 
612 TEST_F(TestManagerImpl, ContextSignalUpdatesWorkspace) {
613  // Setup
614  StartDaq1();
615 
616  EXPECT_CALL(m_workspace, StoreContext(Field(&DaqContext::id, "daq1")));
617 
618  // Run
619  m_daq1->signal(m_daq_ctx_1);
620 }
Started operation was aborted.
Definition: error.hpp:47
Implements daq::Manager.
Definition: manager.hpp:253
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:485
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:433
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:529
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:82
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
DaqContext m_daq_ctx_1
void TearDown() override
Definition: testManager.cpp:52
ManagerImpl m_manager
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
std::string m_daq_id_2
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
Definition: testManager.cpp:75
void TearDown() override
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
void SetUp() override
Definition: testManager.cpp:50
std::string m_daq_id_1
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
DaqContext m_daq_ctx_2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
Simple observer used for testing.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
Definition: keyword.hpp:330
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
Definition: keyword.hpp:337
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Merging
DAQ is being merged.
@ Transferring
Input files are being transferred.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
Definition: manager.hpp:35
T ExpectNoException(boost::future< T > &f)
Definition: testManager.cpp:28
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
std::chrono::time_point< std::chrono::steady_clock > TimePoint
Definition: status.hpp:125
std::string id
Definition: status.hpp:140
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
Mock of DaqController.
Defines shared test utilities.