ifw-daq  3.0.1
IFW Data Acquisition modules
testManager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief daq::ManagerImpl tests
7  */
8 #include <daq/manager.hpp>
9 #include <fmt/ostream.h>
10 #include <regex>
11 
12 #include "mock/daqController.hpp"
13 #include "mock/dpmClientMock.hpp"
14 #include "mock/mockWorkspace.hpp"
15 #include "statusObserver.hpp"
16 #include "utils.hpp"
17 #include <gtest/gtest.h>
18 #include <log4cplus/loggingmacros.h>
19 
20 #include <daq/error/report.hpp>
21 
22 using namespace daq;
23 using namespace ::testing;
24 using namespace std::literals::string_view_literals;
25 using namespace std::chrono_literals;
26 
27 struct FixtureBase : ::testing::Test {
28  template <class T>
29  T ExpectNoException(boost::future<T>& f) {
30  if (!f.is_ready()) {
31  ADD_FAILURE() << "Future is not ready";
32  throw std::runtime_error("test failure");
33  }
34  try {
35  return f.get();
36  } catch (...) {
37  LOG4CPLUS_ERROR("test",
38  "Future contained exception\n"
39  << daq::error::NestedExceptionReporter(std::current_exception()));
40  throw;
41  }
42  }
43  ManagerParams m_params = {"INS"};
44 };
45 
46 /**
47  * @ingroup daq_ocm_libdaq_test
48  */
50 public:
51  void SetUp() override {
52  }
53  void TearDown() override {
54  }
55 };
56 
57 /**
58  * @ingroup daq_ocm_libdaq_test
59  */
60 class TestManagerImpl : public FixtureBase {
61 public:
63  : m_io_ctx()
64  , m_executor(m_io_ctx)
65  , m_dpm_client(std::make_shared<DpmClientMock>())
66  , m_event_log(std::make_shared<daq::ObservableEventLog>())
67  , m_manager(m_executor,
68  m_params,
69  m_workspace,
70  m_event_log,
71  m_daq_factory,
72  m_dpm_client,
73  log4cplus::Logger::getInstance("test")) {
74  }
75  /**
76  * Creates manager and adds two data acquisitions.
77  */
78  void SetUp() override {
79  m_daq_id_1 = "daq1";
80  m_daq_id_2 = "daq2";
81  m_daq_ctx_1.id = m_daq_id_1;
82  m_daq_ctx_1.file_id = "fileid1";
83  m_daq_ctx_2.id = m_daq_id_2;
84  m_daq_ctx_2.file_id = "fileid2";
85 
86  m_daq1_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
87  m_daq2_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
88  m_daq1 = std::make_shared<DaqControllerMock>();
89  m_daq2 = std::make_shared<DaqControllerMock>();
90  m_daq_factory.ocm_mocks["daq1"] = m_daq1;
91  m_daq_factory.ocm_mocks["daq2"] = m_daq2;
92 
93  m_dpm_daq1 = std::make_shared<DaqControllerMock>();
94  m_dpm_daq2 = std::make_shared<DaqControllerMock>();
95  m_daq_factory.dpm_mocks["daq1"] = m_dpm_daq1;
96  m_daq_factory.dpm_mocks["daq2"] = m_dpm_daq2;
97 
98  EXPECT_CALL(*m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_1));
99  EXPECT_CALL(*m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_2));
100  EXPECT_CALL(*m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq1_status));
101  EXPECT_CALL(*m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq2_status));
102  EXPECT_CALL(Const(*m_daq1), GetStatus())
103  .Times(AnyNumber())
104  .WillRepeatedly(Return(m_daq1_status));
105  EXPECT_CALL(Const(*m_daq2), GetStatus())
106  .Times(AnyNumber())
107  .WillRepeatedly(Return(m_daq2_status));
108  EXPECT_CALL(Const(*m_daq1), GetContext())
109  .Times(AnyNumber())
110  .WillRepeatedly(ReturnRef(m_daq_ctx_1));
111 
112  m_daq1_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
113  m_daq1_dpm_status->SetState(State::Collecting);
114  m_daq2_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
115  m_daq2_dpm_status->SetState(State::Merging);
116 
117  EXPECT_CALL(*m_dpm_daq1, GetId()).WillRepeatedly(ReturnRef(m_daq_id_1));
118  EXPECT_CALL(*m_dpm_daq1, GetStatus())
119  .Times(AnyNumber())
120  .WillRepeatedly(Return(m_daq1_status));
121  EXPECT_CALL(Const(*m_dpm_daq1), GetStatus())
122  .Times(AnyNumber())
123  .WillRepeatedly(Return(m_daq1_status));
124  EXPECT_CALL(Const(*m_dpm_daq1), GetContext())
125  .Times(AnyNumber())
126  .WillRepeatedly(ReturnRef(m_daq_ctx_1));
127 
128  EXPECT_CALL(*m_dpm_daq2, GetId()).WillRepeatedly(ReturnRef(m_daq_id_2));
129  EXPECT_CALL(*m_dpm_daq2, GetStatus())
130  .Times(AnyNumber())
131  .WillRepeatedly(Return(m_daq2_dpm_status));
132  EXPECT_CALL(Const(*m_dpm_daq2), GetStatus())
133  .Times(AnyNumber())
134  .WillRepeatedly(Return(m_daq2_dpm_status));
135  EXPECT_CALL(Const(*m_dpm_daq2), GetContext())
136  .Times(AnyNumber())
137  .WillRepeatedly(ReturnRef(m_daq_ctx_2));
138  }
139 
140  auto StartDaq1() -> boost::future<State> {
141  EXPECT_CALL(*m_daq1, StartAsync())
142  .WillOnce(Return(ByMove(boost::make_ready_future<State>(m_daq1_status->GetState()))));
143  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
144  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1"))).Times(AtLeast(1));
145  EXPECT_CALL(m_workspace, StoreContext(_));
146  return m_manager.StartDaqAsync(m_daq_ctx_1);
147  }
148 
149  void TearDown() override {
150  // Run any pending handlers to allow mocks to expire without leaking
151  m_io_ctx.restart();
152  m_io_ctx.poll();
153 
154  m_daq_factory.ocm_mocks.clear();
155  m_daq_factory.dpm_mocks.clear();
156  }
157 
158  boost::asio::io_context m_io_ctx;
161  std::shared_ptr<DpmClientMock> m_dpm_client;
162  std::shared_ptr<daq::ObservableEventLog> m_event_log;
163  std::string m_daq_id_1;
164  std::string m_daq_id_2;
167  std::shared_ptr<DaqControllerMock> m_daq1;
168  std::shared_ptr<DaqControllerMock> m_daq2;
169  std::shared_ptr<DaqControllerMock> m_dpm_daq1;
170  std::shared_ptr<DaqControllerMock> m_dpm_daq2;
171  std::shared_ptr<ObservableStatus> m_daq1_status;
172  std::shared_ptr<ObservableStatus> m_daq2_status;
173  std::shared_ptr<ObservableStatus> m_daq1_dpm_status;
174  std::shared_ptr<ObservableStatus> m_daq2_dpm_status;
177 };
178 
179 TEST_F(TestManagerImplLifecycle, AddDaqNotifiesObserver) {
180  // Setup
181  boost::asio::io_context io_ctx;
182  rad::IoExecutor executor(io_ctx);
183  DaqControllerFactoryFake factory;
184  auto event_log = std::make_shared<ObservableEventLog>();
185  DaqContext daq_ctx;
186  MockWorkspace workspace;
187  std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
188 
189  ManagerImpl mgr(executor,
190  m_params,
191  workspace,
192  event_log,
193  factory,
194  dpm_client,
195  log4cplus::Logger::getInstance("test"));
196  auto id = std::string("id");
197  auto status = std::make_shared<ObservableStatus>(id, "fileid");
198  daq_ctx.id = id;
199  daq_ctx.file_id = id;
200  auto daq = std::make_shared<DaqControllerMock>();
201  factory.ocm_mocks[id] = daq;
202 
203  EXPECT_CALL(*daq, StartAsync())
204  .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
205  EXPECT_CALL(*daq, GetId()).WillRepeatedly(ReturnRef(id));
206  EXPECT_CALL(*daq, GetStatus()).WillRepeatedly(Return(status));
207  EXPECT_CALL(Const(*daq), GetStatus()).WillRepeatedly(Return(status));
208  EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
209 
210  EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
211  EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
212  EXPECT_CALL(workspace, StoreContext(daq_ctx));
213 
215  EXPECT_CALL(o, CallOperator(_));
216  mgr.GetStatusSignal().ConnectObserver(std::reference_wrapper(o));
217 
218  // Run
219  auto f = mgr.StartDaqAsync(daq_ctx);
220  io_ctx.poll();
221  ExpectNoException(f);
222 }
223 
224 TEST_F(TestManagerImplLifecycle, AwaitStateCompletesWithAbandonedManager) {
225  // Setup
226  boost::asio::io_context io_ctx;
227  rad::IoExecutor executor(io_ctx);
228  DaqControllerFactoryFake factory;
229  auto event_log = std::make_shared<ObservableEventLog>();
230  DaqContext daq_ctx;
231  MockWorkspace workspace;
232  std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
233 
234  auto id = std::string("id");
235  daq_ctx.id = id;
236  daq_ctx.file_id = id;
237  auto status = std::make_shared<ObservableStatus>(id, "fileid");
238  auto daq = std::make_shared<DaqControllerMock>();
239  factory.ocm_mocks[id] = daq;
240  EXPECT_CALL(*daq, StartAsync())
241  .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
242  EXPECT_CALL(*daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(id));
243  EXPECT_CALL(*daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
244  EXPECT_CALL(Const(*daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
245  EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
246 
247  EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
248  EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
249  EXPECT_CALL(workspace, StoreContext(daq_ctx));
250 
251  boost::future<Result<Status>> res;
252  {
253  ManagerImpl mgr(executor,
254  m_params,
255  workspace,
256  event_log,
257  factory,
258  dpm_client,
259  log4cplus::Logger::getInstance("test"));
260  mgr.StartDaqAsync(daq_ctx);
261 
262  // Initiate await that should be aborted when manager is destroyed
263  mgr.AwaitDaqStateAsync("id"sv, State::Acquiring, 5ms).swap(res);
264  ASSERT_FALSE(res.is_ready());
265  }
266 
267  // Run
268  MakeTestProgress(io_ctx, &res);
269  ASSERT_TRUE(res.is_ready());
270  EXPECT_THROW(res.get(), DaqOperationAborted);
271 }
272 
274  {
275  std::regex regex("INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
276  auto id = MakeIdCandidate("INSTRUMENT", 0);
277  EXPECT_TRUE(std::regex_match(id, regex))
278  << "Instrument ID should be truncated to 5 characters if too long";
279  }
280  {
281  std::regex regex("INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
282  auto id = m_manager.MakeDaqId();
283  EXPECT_TRUE(std::regex_match(id, regex));
284  }
285  {
286  auto id1 = MakeIdCandidate("INSTRUMENT", 0);
287  auto id2 = MakeIdCandidate("INSTRUMENT", 1);
288  EXPECT_NE(id1, id2) << "Adding jitter should have made the ID different";
289  }
290 }
291 
292 TEST_F(TestManagerImpl, StartDaqWithSameIdThrowsInvalidArgument) {
293  // Setup
294  EXPECT_CALL(*m_daq1, StartAsync())
295  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
296  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
297  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
298  EXPECT_CALL(m_workspace, StoreContext(_));
299 
300  m_manager.StartDaqAsync(m_daq_ctx_1);
301 
302  // Tes
303  auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
304  m_io_ctx.poll();
305  ASSERT_TRUE(f.is_ready());
306  EXPECT_THROW(f.get(), std::invalid_argument);
307 }
308 
309 TEST_F(TestManagerImpl, StartDaqAsyncStartsDaq) {
310  // Setup
311  EXPECT_CALL(*m_daq1, StartAsync())
312  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
313  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
314  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
315  EXPECT_CALL(m_workspace, StoreContext(_));
316 
317  // Run
318  auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
319  m_io_ctx.poll();
320  ASSERT_TRUE(fut.is_ready());
321  EXPECT_EQ(fut.get(), State::Starting);
322 }
323 
324 /**
325  * Tests the behaviour that if DaqController::StartAsync() fails it should automatically
326  * force-abort the acquisition. This is mainly done because putting that responsibility on the user
327  * is not productive.
328  *
329  * note: This behaviour can be made configurable.
330  */
331 TEST_F(TestManagerImpl, StartDaqAbortsIfDaqControllerFails) {
332  // Setup
333  EXPECT_CALL(*m_daq1, StartAsync())
334  .WillOnce(Return(
335  ByMove(boost::make_exceptional_future<State>(std::runtime_error("START FAILED")))));
336  boost::promise<daq::Status> abort_reply;
337  EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Tolerant))
338  .WillOnce(Return(ByMove(abort_reply.get_future())));
339  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
340  EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
341  EXPECT_CALL(m_workspace, StoreContext(_));
342 
343  // Test
344  auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
345  m_io_ctx.poll();
346  ASSERT_FALSE(f.is_ready());
347 
348  // Send reply
349  abort_reply.set_exception(std::logic_error("ABORT ALSO FAILED"));
350 
351  MakeTestProgress(m_io_ctx, &f);
352  ASSERT_TRUE(f.is_ready());
353  EXPECT_THROW(f.get(), std::runtime_error);
354 }
355 
356 TEST_F(TestManagerImpl, StopNonexistingDaqThrowsInvalidArgument) {
357  auto fut = m_manager.StopDaqAsync("nonexistant-id"sv, daq::ErrorPolicy::Strict);
358  ASSERT_TRUE(fut.is_ready());
359  EXPECT_THROW(fut.get(), std::invalid_argument);
360 }
361 
362 TEST_F(TestManagerImpl, StopDaqAsyncStopsDaq) {
363  // Setup
364  StartDaq1();
365 
367  EXPECT_CALL(*m_daq1, StopAsync(daq::ErrorPolicy::Strict))
368  .WillOnce(Return(ByMove(
369  boost::make_ready_future<Status>(Status("daq1", "fileid", State::Stopped, false, t)))));
370 
371  // Run
372  auto fut = m_manager.StopDaqAsync("daq1"sv, daq::ErrorPolicy::Strict);
373  auto status = ExpectNoException(fut);
374  EXPECT_EQ(status.state, State::Stopped);
375  EXPECT_EQ(status.error, false);
376 }
377 
378 TEST_F(TestManagerImpl, AbortNonexistingDaqThrowsInvalidArgumentEvenIfTolerant) {
379  auto fut = m_manager.AbortDaqAsync("nonexistant-id"sv, ErrorPolicy::Tolerant);
380  ASSERT_TRUE(fut.is_ready());
381  EXPECT_THROW(fut.get(), std::invalid_argument);
382 }
383 
384 TEST_F(TestManagerImpl, AbortDaqAsyncAbortsDaq) {
385  // Setup
386  StartDaq1();
387 
388  auto reply_status = daq::Status("daq1", "fileid");
389  reply_status.state = State::AbortingAcquiring;
390  EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Strict))
391  .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
392 
393  // Run
394  auto fut = m_manager.AbortDaqAsync("daq1"sv, ErrorPolicy::Strict);
395  auto result = ExpectNoException(fut);
396  EXPECT_EQ(result.state, State::AbortingAcquiring);
397  EXPECT_FALSE(result.error);
398 }
399 
400 TEST_F(TestManagerImpl, UpdateKeywordsUpdatesKeywords) {
401  // Setup
402  StartDaq1();
403 
404  daq::fits::KeywordVector keywords = {
405  daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
406  daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
407  EXPECT_CALL(*m_daq1, UpdateKeywords(keywords));
408 
409  // Run
410  m_manager.UpdateKeywords("daq1"sv, keywords);
411 }
412 
413 TEST_F(TestManagerImpl, UpdateKeywordsForNonexistingDaqThrowsInvalidArgument) {
414  daq::fits::KeywordVector keywords = {
415  daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
416  daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
417  EXPECT_THROW(m_manager.UpdateKeywords("nonexistant-id"sv, keywords), std::invalid_argument);
418 }
419 
420 TEST_F(TestManagerImpl, GetStatus) {
421  // Setup
422  StartDaq1();
423 
424  // Run
425  auto status = m_manager.GetStatus("daq1"sv);
426  EXPECT_EQ(status.id, "daq1");
427  EXPECT_EQ(status.state, daq::State::NotStarted);
428  EXPECT_FALSE(status.error);
429 }
430 
431 TEST_F(TestManagerImpl, GetStatusThrowsIfDaqDoesNotExist) {
432  // Run
433  EXPECT_CALL(m_workspace, LoadArchivedStatus("nonexistant")).WillOnce(Return(std::nullopt));
434  EXPECT_THROW(m_manager.GetStatus("nonexistant"sv), std::invalid_argument);
435 }
436 
437 TEST_F(TestManagerImpl, GetStatusReturnsArchivedStatus) {
438  // Run
439  auto status = Status("id", "id");
440  EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
441  EXPECT_EQ(m_manager.GetStatus("archived"sv), status);
442 }
443 
444 TEST_F(TestManagerImpl, AwaitDaqStateWorksForCompletedArchivedDaqs) {
445  // Run
446  auto status = Status("id", "id");
447  status.state = State::Completed;
448  EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
449  auto fut = m_manager.AwaitDaqStateAsync("archived"sv, daq::State::Completed, 10ms);
450  m_io_ctx.poll();
451  ASSERT_TRUE(fut.is_ready());
452  auto [timeout, result] = fut.get();
453  EXPECT_FALSE(timeout);
454  EXPECT_EQ(result, status);
455 }
456 
457 TEST_F(TestManagerImpl, AwaitDaqStateReturnsExceptionalFutureForIncompleteArchivedDaqs) {
458  // Run
459  auto status = Status("id", "id");
460  status.state = State::Collecting;
461  EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
462  auto fut = m_manager.AwaitDaqStateAsync("archived"sv, daq::State::Completed, 10ms);
463  m_io_ctx.poll();
464  ASSERT_TRUE(fut.is_ready());
465  EXPECT_THROW(fut.get(), std::invalid_argument);
466 }
467 
468 TEST_F(TestManagerImpl, AwaitDaqStateReturnsReadyFutureIfConditionIsFulfilled) {
469  // Run
470  StartDaq1();
471  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::NotStarted, 10ms);
472  m_io_ctx.poll();
473 
474  ASSERT_TRUE(fut.is_ready());
475  auto [timeout, result] = fut.get();
476  EXPECT_FALSE(timeout);
477  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
478 }
479 
480 TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenConditionIsFulfilled) {
481  // Run
482  StartDaq1();
483  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 10ms);
484  ASSERT_FALSE(fut.is_ready());
485  m_daq1_status->SetState(daq::State::Acquiring);
486  MakeTestProgress(m_io_ctx, &fut);
487 
488  ASSERT_TRUE(fut.is_ready());
489  auto [timeout, result] = fut.get();
490  EXPECT_FALSE(timeout);
491  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
492 }
493 
494 TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenItTimesout) {
495  // Run
496  StartDaq1();
497  auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 0ms);
498  ASSERT_FALSE(fut.is_ready());
499  MakeTestProgress(m_io_ctx, &fut);
500 
501  ASSERT_TRUE(fut.is_ready()) << "Timer should have triggered to make the future ready";
502  auto [timeout, result] = fut.get();
503  EXPECT_TRUE(timeout);
504  EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
505 }
506 
507 TEST_F(TestManagerImpl, StoppedDaqTransitionsToNotScheduledAndIsScheduledThenCompleted) {
508  // Setup
509  StartDaq1();
510 
511  // Test
512  // Simulate completed Acquisition Phase
513  m_daq1_status->SetState(State::Stopped);
514 
515  EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
516  EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
517 
518  EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
519  .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Scheduled))));
520  // When DAQ transitions to using DpmDaqController workspace is updated
521  EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
522  EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
523 
524  // Run pending completion handlers
525  m_io_ctx.poll();
526 
527  // After poll we expect Manager to have created DPM version of daqcontroller and transitioned it
528  // to NotScheduled.
529  EXPECT_EQ(m_daq1_status->GetState(), State::NotScheduled);
530 
531  auto daqs = m_manager.GetDaqControllers();
532  ASSERT_EQ(daqs.size(), 1u);
533  EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
534 
535  // Simulate start and then completion emitting state changes in ObservableStatus.
536  // Although signal observers are called synchronously we still pump event loop in case
537  // any callbacks are scheduled.
538  {
539  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq1_status->GetId())).Times(1);
540  m_daq1_status->SetState(State::Merging);
541  m_io_ctx.restart();
542  m_io_ctx.poll();
543  }
544 
545  {
546  EXPECT_CALL(*m_dpm_client, StopMonitorStatus(m_daq_ctx_1.id)).Times(1);
547  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id)).Times(1);
548 
549  m_daq1_status->SetState(State::Completed);
550  m_io_ctx.restart();
551  m_io_ctx.poll();
552  EXPECT_FALSE(m_manager.HaveDaq(m_daq_ctx_1.id));
553  }
554 }
555 
556 TEST_F(TestManagerImpl, RestoreFromWorkspaceAddsDaq) {
557  // Setup
558  // daq1 should create OCM daq controller
559  // daq2 should create DPM daq controller
560  //
561  // Set time to be not stale
562  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
563  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
564  m_daq1_status->SetState(State::Acquiring);
565  m_daq2_status->SetState(State::Merging);
566 
567  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
568 
569  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
570 
571  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
572  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
573  .WillOnce(Return(m_daq1_status->GetStatus()));
574  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(0);
575 
576  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
577  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
578  .WillOnce(Return(m_daq2_status->GetStatus()));
579  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id));
580 
581  EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
582 
583  // Test
584  m_manager.RestoreFromWorkspace();
585 }
586 
587 TEST_F(TestManagerImpl, RestoreFromWorkspaceArchivesStaleDaqs) {
588  // Setup
589  // daq1 should be archived
590  // daq2 should create DPM daq controller
591  //
592  // Set time to be not stale
593  m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
594  m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
595 
596  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
597 
598  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
599 
600  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
601  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
602  .WillOnce(Return(m_daq1_status->GetStatus()));
603  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
604 
605  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
606  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
607  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
608  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
609 
610  EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
611 
612  // Test
613  m_manager.RestoreFromWorkspace();
614 }
615 
616 TEST_F(TestManagerImpl, RestoreFromWorkspaceSkipsMissingDaq) {
617  // Setup
618  // daq1 should be archived
619  // daq2 should create DPM daq controller
620  //
621  // Set time to be not stale
622  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
623  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
624 
625  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
626  std::vector<std::string> to_store = {m_daq_ctx_2.id};
627 
628  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
629 
630  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
631  .WillOnce(Throw(std::runtime_error("ouch")));
632  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
633  .Times(Between(0, 1))
634  .WillRepeatedly(Return(m_daq1_status->GetStatus()));
635  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
636  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
637 
638  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
639  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
640  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
641  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
642 
643  EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
644 
645  // Test
646  m_manager.RestoreFromWorkspace();
647 }
648 
649 TEST_F(TestManagerImpl, RestoreFromWorkspaceIgnoresArchiveFailure) {
650  // Setup
651  // daq1 should be archived
652  // daq2 should create DPM daq controller
653  //
654  // Set time to be not stale
655  m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
656  m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
657  std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
658  std::vector<std::string> to_store = {m_daq_ctx_2.id};
659 
660  EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
661 
662  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
663  .WillOnce(Throw(std::runtime_error("ouch")));
664  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
665  .Times(Between(0, 1))
666  .WillRepeatedly(Return(m_daq1_status->GetStatus()));
667  EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
668  .WillOnce(Throw(std::runtime_error("ouch")));
669  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
670 
671  EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
672  EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
673  .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
674 
675  EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
676 
677  EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
678 
679  // Test
680  m_manager.RestoreFromWorkspace();
681 }
682 
683 TEST_F(TestManagerImpl, ContextSignalUpdatesWorkspace) {
684  // Setup
685  StartDaq1();
686 
687  EXPECT_CALL(m_workspace, StoreContext(Field(&DaqContext::id, "daq1")));
688 
689  // Run
690  m_daq1->signal(m_daq_ctx_1);
691 }
Started operation was aborted.
Definition: error.hpp:47
Combined mock and fake of interface to DPM server.
Implements daq::Manager.
Definition: manager.hpp:254
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:517
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:465
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:577
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:83
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::DpmClient
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
DaqContext m_daq_ctx_1
void TearDown() override
Definition: testManager.cpp:53
ManagerImpl m_manager
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
std::string m_daq_id_2
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
Definition: testManager.cpp:78
void TearDown() override
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
void SetUp() override
Definition: testManager.cpp:51
std::string m_daq_id_1
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
DaqContext m_daq_ctx_2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
std::shared_ptr< DpmClientMock > m_dpm_client
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
Simple observer used for testing.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
Definition: keyword.hpp:330
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
Definition: keyword.hpp:337
TEST_F(TestDpmClient, StartMonitoringSendsRequestAndReceivesReply)
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
@ Completed
Completed DAQ.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
Definition: manager.hpp:36
T ExpectNoException(boost::future< T > &f)
Definition: testManager.cpp:29
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
std::string id
Definition: status.hpp:174
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:154
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Mock of DaqController.
Defines shared test utilities.