ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
testManager.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_libdaq_test
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief daq::ManagerImpl tests
7 */
8#include <daq/manager.hpp>
9#include <fmt/ostream.h>
10#include <regex>
11
12#include "daq/fits/keyword.hpp"
16#include "mock/mockWorkspace.hpp"
17#include "statusObserver.hpp"
18#include "utils.hpp"
19#include <gtest/gtest.h>
20#include <log4cplus/loggingmacros.h>
21
22#include <daq/error/report.hpp>
23
24using namespace daq;
25using namespace ::testing;
26using namespace std::literals::string_view_literals;
27using namespace std::chrono_literals;
28
29MATCHER_P(KeywordNameEq, name, "keyword name equals") {
30 auto kw = fits::GetKeywordName(arg);
31 *result_listener << "keyword is " << (arg);
32 return kw.name == name;
33}
34
35struct FixtureBase : ::testing::Test {
36 template <class T>
37 T ExpectNoException(boost::future<T>& f) {
38 if (!f.is_ready()) {
39 ADD_FAILURE() << "Future is not ready";
40 throw std::runtime_error("test failure");
41 }
42 try {
43 return f.get();
44 } catch (...) {
45 LOG4CPLUS_ERROR("test",
46 "Future contained exception\n"
47 << daq::error::NestedExceptionReporter(std::current_exception()));
48 throw;
49 }
50 }
52 NiceMock<KeywordFormatterMock> m_kw_formatter;
53};
54
55/**
56 * @ingroup daq_ocm_libdaq_test
57 */
59public:
60 void SetUp() override {
61 }
62 void TearDown() override {
63 }
64};
65
66/**
67 * @ingroup daq_ocm_libdaq_test
68 */
70public:
72 : m_io_ctx()
74 , m_dpm_client(std::make_shared<DpmClientMock>())
75 , m_event_log(std::make_shared<daq::ObservableEventLog>())
83 log4cplus::Logger::getInstance("test")) {
84 }
85 /**
86 * Creates manager and adds two data acquisitions.
87 */
88 void SetUp() override {
89 m_daq_id_1 = "daq1";
90 m_daq_id_2 = "daq2";
92 m_daq_ctx_1.file_id = "fileid1";
94 m_daq_ctx_2.file_id = "fileid2";
95
96 m_daq1_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
97 m_daq2_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
98 m_daq1 = std::make_shared<DaqControllerMock>();
99 m_daq2 = std::make_shared<DaqControllerMock>();
102
103 m_dpm_daq1 = std::make_shared<DaqControllerMock>();
104 m_dpm_daq2 = std::make_shared<DaqControllerMock>();
107 // Set up behaviour for formatter such that keyword called "INVALID" or "UNKNOWN" throw
108 // whereas the rest use standard formatting.
109 ON_CALL(m_kw_formatter, Format(_))
110 .WillByDefault(Invoke(
111 static_cast<fits::LiteralKeyword (*)(fits::KeywordVariant const&)>(&fits::Format)));
112 ON_CALL(m_kw_formatter, Format(KeywordNameEq("INVALID")))
113 .WillByDefault(
114 Throw(boost::enable_current_exception(fits::InvalidKeyword("INVALID", "invalid"))));
115 ON_CALL(m_kw_formatter, Format(KeywordNameEq("UNKNOWN")))
116 .WillByDefault(
117 Throw(boost::enable_current_exception(fits::UnknownKeyword("UNKNOWN", {}))));
118
119 EXPECT_CALL(*m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_1));
120 EXPECT_CALL(*m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_2));
121 EXPECT_CALL(*m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq1_status));
122 EXPECT_CALL(*m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq2_status));
123 EXPECT_CALL(Const(*m_daq1), GetStatus())
124 .Times(AnyNumber())
125 .WillRepeatedly(Return(m_daq1_status));
126 EXPECT_CALL(Const(*m_daq2), GetStatus())
127 .Times(AnyNumber())
128 .WillRepeatedly(Return(m_daq2_status));
129 EXPECT_CALL(Const(*m_daq1), GetContext())
130 .Times(AnyNumber())
131 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
132
133 m_daq1_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_1, "fileid1");
134 m_daq1_dpm_status->SetState(State::Collecting);
135 m_daq2_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_2, "fileid2");
136 m_daq2_dpm_status->SetState(State::Merging);
137
138 EXPECT_CALL(*m_dpm_daq1, GetId()).WillRepeatedly(ReturnRef(m_daq_id_1));
139 EXPECT_CALL(*m_dpm_daq1, GetStatus())
140 .Times(AnyNumber())
141 .WillRepeatedly(Return(m_daq1_status));
142 EXPECT_CALL(Const(*m_dpm_daq1), GetStatus())
143 .Times(AnyNumber())
144 .WillRepeatedly(Return(m_daq1_status));
145 EXPECT_CALL(Const(*m_dpm_daq1), GetContext())
146 .Times(AnyNumber())
147 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
148
149 EXPECT_CALL(*m_dpm_daq2, GetId()).WillRepeatedly(ReturnRef(m_daq_id_2));
150 EXPECT_CALL(*m_dpm_daq2, GetStatus())
151 .Times(AnyNumber())
152 .WillRepeatedly(Return(m_daq2_dpm_status));
153 EXPECT_CALL(Const(*m_dpm_daq2), GetStatus())
154 .Times(AnyNumber())
155 .WillRepeatedly(Return(m_daq2_dpm_status));
156 EXPECT_CALL(Const(*m_dpm_daq2), GetContext())
157 .Times(AnyNumber())
158 .WillRepeatedly(ReturnRef(m_daq_ctx_2));
159 }
160
161 auto StartDaq1() -> boost::future<State> {
162 EXPECT_CALL(*m_daq1, StartAsync())
163 .WillOnce(Return(ByMove(boost::make_ready_future<State>(m_daq1_status->GetState()))));
164 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
165 EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1"))).Times(AtLeast(1));
166 EXPECT_CALL(m_workspace, StoreContext(_));
168 }
169
170 void TearDown() override {
171 // Run any pending handlers to allow mocks to expire without leaking
172 m_io_ctx.restart();
173 m_io_ctx.poll();
174
175 m_daq_factory.ocm_mocks.clear();
176 m_daq_factory.dpm_mocks.clear();
177 }
178
179 boost::asio::io_context m_io_ctx;
182 std::shared_ptr<DpmClientMock> m_dpm_client;
183 std::shared_ptr<daq::ObservableEventLog> m_event_log;
184 std::string m_daq_id_1;
185 std::string m_daq_id_2;
188 std::shared_ptr<DaqControllerMock> m_daq1;
189 std::shared_ptr<DaqControllerMock> m_daq2;
190 std::shared_ptr<DaqControllerMock> m_dpm_daq1;
191 std::shared_ptr<DaqControllerMock> m_dpm_daq2;
192 std::shared_ptr<ObservableStatus> m_daq1_status;
193 std::shared_ptr<ObservableStatus> m_daq2_status;
194 std::shared_ptr<ObservableStatus> m_daq1_dpm_status;
195 std::shared_ptr<ObservableStatus> m_daq2_dpm_status;
198};
199
200TEST_F(TestManagerImplLifecycle, AddDaqNotifiesObserver) {
201 // Setup
202 boost::asio::io_context io_ctx;
203 rad::IoExecutor executor(io_ctx);
205 auto event_log = std::make_shared<ObservableEventLog>();
206 DaqContext daq_ctx;
207 MockWorkspace workspace;
208 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
209
210 ManagerImpl mgr(executor,
211 m_params,
212 workspace,
213 m_kw_formatter,
214 event_log,
215 factory,
216 dpm_client,
217 log4cplus::Logger::getInstance("test"));
218 auto id = std::string("id");
219 auto status = std::make_shared<ObservableStatus>(id, "fileid");
220 daq_ctx.id = id;
221 daq_ctx.file_id = id;
222 auto daq = std::make_shared<DaqControllerMock>();
223 factory.ocm_mocks[id] = daq;
224
225 EXPECT_CALL(*daq, StartAsync())
226 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
227 EXPECT_CALL(*daq, GetId()).WillRepeatedly(ReturnRef(id));
228 EXPECT_CALL(*daq, GetStatus()).WillRepeatedly(Return(status));
229 EXPECT_CALL(Const(*daq), GetStatus()).WillRepeatedly(Return(status));
230 EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
231
232 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
233 EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
234 EXPECT_CALL(workspace, StoreContext(daq_ctx));
235
237 EXPECT_CALL(o, CallOperator(_));
238 mgr.GetStatusSignal().ConnectObserver(std::reference_wrapper(o));
239
240 // Run
241 auto f = mgr.StartDaqAsync(daq_ctx);
242 io_ctx.poll();
243 ExpectNoException(f);
244}
245
246TEST_F(TestManagerImplLifecycle, AwaitStateCompletesWithAbandonedManager) {
247 // Setup
248 boost::asio::io_context io_ctx;
249 rad::IoExecutor executor(io_ctx);
251 auto event_log = std::make_shared<ObservableEventLog>();
252 DaqContext daq_ctx;
253 MockWorkspace workspace;
254 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
255
256 auto id = std::string("id");
257 daq_ctx.id = id;
258 daq_ctx.file_id = id;
259 auto status = std::make_shared<ObservableStatus>(id, "fileid");
260 auto daq = std::make_shared<DaqControllerMock>();
261 factory.ocm_mocks[id] = daq;
262 EXPECT_CALL(*daq, StartAsync())
263 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
264 EXPECT_CALL(*daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(id));
265 EXPECT_CALL(*daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
266 EXPECT_CALL(Const(*daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
267 EXPECT_CALL(Const(*daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
268
269 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({"id"})));
270 EXPECT_CALL(workspace, StoreStatus(Field(&Status::id, "id")));
271 EXPECT_CALL(workspace, StoreContext(daq_ctx));
272
273 boost::future<Result<Status>> res;
274 {
275 ManagerImpl mgr(executor,
276 m_params,
277 workspace,
278 m_kw_formatter,
279 event_log,
280 factory,
281 dpm_client,
282 log4cplus::Logger::getInstance("test"));
283 mgr.StartDaqAsync(daq_ctx);
284
285 // Initiate await that should be aborted when manager is destroyed
286 mgr.AwaitDaqStateAsync("id"sv, State::Acquiring, 5ms).swap(res);
287 ASSERT_FALSE(res.is_ready());
288 }
289
290 // Run
291 MakeTestProgress(io_ctx, &res);
292 ASSERT_TRUE(res.is_ready());
293 EXPECT_THROW(res.get(), DaqOperationAborted);
294}
295
297 {
298 std::regex regex("INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
299 auto id = MakeIdCandidate("INSTRUMENT", 0);
300 EXPECT_TRUE(std::regex_match(id, regex))
301 << "Instrument ID should be truncated to 5 characters if too long";
302 }
303 {
304 std::regex regex("INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
305 auto id = m_manager.MakeDaqId();
306 EXPECT_TRUE(std::regex_match(id, regex));
307 }
308 {
309 auto id1 = MakeIdCandidate("INSTRUMENT", 0);
310 auto id2 = MakeIdCandidate("INSTRUMENT", 1);
311 EXPECT_NE(id1, id2) << "Adding jitter should have made the ID different";
312 }
313}
314
315TEST_F(TestManagerImpl, StartDaqWithSameIdThrowsInvalidDaqId) {
316 // Setup
317 EXPECT_CALL(*m_daq1, StartAsync())
318 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
319 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
320 EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
321 EXPECT_CALL(m_workspace, StoreContext(_));
322
323 m_manager.StartDaqAsync(m_daq_ctx_1);
324
325 // Test
326 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
327 m_io_ctx.poll();
328 ASSERT_TRUE(f.is_ready());
329 EXPECT_THROW(f.get(), InvalidDaqId);
330}
331
332TEST_F(TestManagerImpl, StartDaqWithUnknownKeywordsThrowsUnknownKeyword) {
333 // Setup
334 EXPECT_CALL(*m_daq1, StartAsync()).Times(0);
335
337 kws.emplace_back(std::in_place_type<fits::EsoKeyword>, "OK", "BAR");
338 kws.emplace_back(std::in_place_type<fits::EsoKeyword>, "UNKNOWN", "BAR");
339
340 m_daq_ctx_1.results.emplace_back("keywords", kws);
341
342 // Test
343 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
344 m_io_ctx.poll();
345 ASSERT_TRUE(f.is_ready());
346 EXPECT_THROW(f.get(), fits::UnknownKeyword);
347}
348
349TEST_F(TestManagerImpl, StartDaqWithInvalidKeywordsThrowsInvalidKeyword) {
350 // Setup
351 EXPECT_CALL(*m_daq1, StartAsync()).Times(0);
352
354 kws.emplace_back(std::in_place_type<fits::EsoKeyword>, "OK", "BAR");
355 kws.emplace_back(std::in_place_type<fits::EsoKeyword>, "INVALID", "BAR");
356
357 m_daq_ctx_1.results.emplace_back("keywords", kws);
358
359 // Test
360 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
361 m_io_ctx.poll();
362 ASSERT_TRUE(f.is_ready());
363 EXPECT_THROW(f.get(), fits::InvalidKeyword);
364}
365
366TEST_F(TestManagerImpl, StartDaqAsyncStartsDaq) {
367 // Setup
368 EXPECT_CALL(*m_daq1, StartAsync())
369 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
370 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
371 EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
372 EXPECT_CALL(m_workspace, StoreContext(_));
373
374 // Run
375 auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
376 m_io_ctx.poll();
377 ASSERT_TRUE(fut.is_ready());
378 EXPECT_EQ(fut.get(), State::Starting);
379}
380
381/**
382 * Tests the behaviour that if DaqController::StartAsync() fails it should automatically
383 * force-abort the acquisition. This is mainly done because putting that responsibility on the user
384 * is not productive.
385 *
386 * note: This behaviour can be made configurable.
387 */
388TEST_F(TestManagerImpl, StartDaqAbortsIfDaqControllerFails) {
389 // Setup
390 EXPECT_CALL(*m_daq1, StartAsync())
391 .WillOnce(Return(
392 ByMove(boost::make_exceptional_future<State>(std::runtime_error("START FAILED")))));
393 boost::promise<daq::Status> abort_reply;
394 EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Tolerant))
395 .WillOnce(Return(ByMove(abort_reply.get_future())));
396 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({"daq1"})));
397 EXPECT_CALL(m_workspace, StoreStatus(Field(&Status::id, "daq1")));
398 EXPECT_CALL(m_workspace, StoreContext(_));
399
400 // Test
401 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
402 m_io_ctx.poll();
403 ASSERT_FALSE(f.is_ready());
404
405 // Send reply
406 abort_reply.set_exception(std::logic_error("ABORT ALSO FAILED"));
407
408 MakeTestProgress(m_io_ctx, &f);
409 ASSERT_TRUE(f.is_ready());
410 EXPECT_THROW(f.get(), std::runtime_error);
411}
412
413TEST_F(TestManagerImpl, StopNonexistingDaqThrowsInvalidArgument) {
414 auto fut = m_manager.StopDaqAsync("nonexistant-id"sv, daq::ErrorPolicy::Strict);
415 ASSERT_TRUE(fut.is_ready());
416 EXPECT_THROW(fut.get(), std::invalid_argument);
417}
418
419TEST_F(TestManagerImpl, StopDaqAsyncStopsDaq) {
420 // Setup
421 StartDaq1();
422
424 EXPECT_CALL(*m_daq1, StopAsync(daq::ErrorPolicy::Strict))
425 .WillOnce(Return(ByMove(
426 boost::make_ready_future<Status>(Status("daq1", "fileid", State::Stopped, t)))));
427
428 // Run
429 auto fut = m_manager.StopDaqAsync("daq1"sv, daq::ErrorPolicy::Strict);
430 auto status = ExpectNoException(fut);
431 EXPECT_EQ(status.state, State::Stopped);
432 EXPECT_FALSE(HasError(status));
433}
434
435TEST_F(TestManagerImpl, AbortNonexistingDaqThrowsInvalidArgumentEvenIfTolerant) {
436 auto fut = m_manager.AbortDaqAsync("nonexistant-id"sv, ErrorPolicy::Tolerant);
437 ASSERT_TRUE(fut.is_ready());
438 EXPECT_THROW(fut.get(), std::invalid_argument);
439}
440
441TEST_F(TestManagerImpl, AbortDaqAsyncAbortsDaq) {
442 // Setup
443 StartDaq1();
444
445 auto reply_status = daq::Status("daq1", "fileid");
446 reply_status.state = State::AbortingAcquiring;
447 EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Strict))
448 .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
449
450 // Run
451 auto fut = m_manager.AbortDaqAsync("daq1"sv, ErrorPolicy::Strict);
452 auto result = ExpectNoException(fut);
453 EXPECT_EQ(result.state, State::AbortingAcquiring);
454 EXPECT_FALSE(HasError(result));
455}
456
457TEST_F(TestManagerImpl, UpdateKeywordsUpdatesKeywords) {
458 // Setup
459 StartDaq1();
460
461 daq::fits::KeywordVector keywords = {
462 daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
463 daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
464 EXPECT_CALL(*m_daq1, UpdateKeywords(keywords));
465
466 // Run
467 m_manager.UpdateKeywords("daq1"sv, keywords);
468}
469
470TEST_F(TestManagerImpl, UpdateKeywordsForNonexistingDaqThrowsInvalidArgument) {
471 daq::fits::KeywordVector keywords = {
472 daq::fits::ValueKeyword("OBJECT", "OBJECT,SKY"),
473 daq::fits::EsoKeyword("OBS TPLNO", static_cast<uint64_t>(2))};
474 EXPECT_THROW(m_manager.UpdateKeywords("nonexistant-id"sv, keywords), std::invalid_argument);
475}
476
478 // Setup
479 StartDaq1();
480
481 // Run
482 auto status = m_manager.GetStatus("daq1"sv);
483 EXPECT_EQ(status.id, "daq1");
484 EXPECT_EQ(status.state, daq::State::NotStarted);
485 EXPECT_FALSE(HasError(status));
486}
487
488TEST_F(TestManagerImpl, GetStatusThrowsIfDaqDoesNotExist) {
489 // Run
490 EXPECT_CALL(m_workspace, LoadArchivedStatus("nonexistant")).WillOnce(Return(std::nullopt));
491 EXPECT_THROW(m_manager.GetStatus("nonexistant"sv), std::invalid_argument);
492}
493
494TEST_F(TestManagerImpl, GetStatusReturnsArchivedStatus) {
495 // Run
496 auto status = Status("id", "id");
497 EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
498 EXPECT_EQ(m_manager.GetStatus("archived"sv), status);
499}
500
501TEST_F(TestManagerImpl, AwaitDaqStateWorksForCompletedArchivedDaqs) {
502 // Run
503 auto status = Status("id", "id");
504 status.state = State::Completed;
505 EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
506 auto fut = m_manager.AwaitDaqStateAsync("archived"sv, daq::State::Completed, 10ms);
507 m_io_ctx.poll();
508 ASSERT_TRUE(fut.is_ready());
509 auto [timeout, result] = fut.get();
510 EXPECT_FALSE(timeout);
511 EXPECT_EQ(result, status);
512}
513
514TEST_F(TestManagerImpl, AwaitDaqStateReturnsExceptionalFutureForIncompleteArchivedDaqs) {
515 // Run
516 auto status = Status("id", "id");
517 status.state = State::Collecting;
518 EXPECT_CALL(m_workspace, LoadArchivedStatus("archived")).WillOnce(Return(status));
519 auto fut = m_manager.AwaitDaqStateAsync("archived"sv, daq::State::Completed, 10ms);
520 m_io_ctx.poll();
521 ASSERT_TRUE(fut.is_ready());
522 EXPECT_THROW(fut.get(), std::invalid_argument);
523}
524
525TEST_F(TestManagerImpl, AwaitDaqStateReturnsReadyFutureIfConditionIsFulfilled) {
526 // Run
527 StartDaq1();
528 auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::NotStarted, 10ms);
529 m_io_ctx.poll();
530
531 ASSERT_TRUE(fut.is_ready());
532 auto [timeout, result] = fut.get();
533 EXPECT_FALSE(timeout);
534 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
535}
536
537TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenConditionIsFulfilled) {
538 // Run
539 StartDaq1();
540 auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 10ms);
541 ASSERT_FALSE(fut.is_ready());
542 m_daq1_status->SetState(daq::State::Acquiring);
543 MakeTestProgress(m_io_ctx, &fut);
544
545 ASSERT_TRUE(fut.is_ready());
546 auto [timeout, result] = fut.get();
547 EXPECT_FALSE(timeout);
548 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
549}
550
551TEST_F(TestManagerImpl, AwaitDaqStateIsReadyWhenItTimesout) {
552 // Run
553 StartDaq1();
554 auto fut = m_manager.AwaitDaqStateAsync("daq1"sv, daq::State::Acquiring, 0ms);
555 ASSERT_FALSE(fut.is_ready());
556 MakeTestProgress(m_io_ctx, &fut);
557
558 ASSERT_TRUE(fut.is_ready()) << "Timer should have triggered to make the future ready";
559 auto [timeout, result] = fut.get();
560 EXPECT_TRUE(timeout);
561 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
562}
563
564TEST_F(TestManagerImpl, StoppedDaqTransitionsToNotScheduledAndIsScheduledThenCompleted) {
565 // Setup
566 StartDaq1();
567
568 // Test
569 // Simulate completed Acquisition Phase
570 m_daq1_status->SetState(State::Stopped);
571
572 EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
573 EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
574
575 EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
576 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Scheduled))));
577 // When DAQ transitions to using DpmDaqController workspace is updated
578 EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
579 EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
580
581 // Run pending completion handlers
582 m_io_ctx.poll();
583
584 // After poll we expect Manager to have created DPM version of daqcontroller and transitioned it
585 // to NotScheduled.
586 EXPECT_EQ(m_daq1_status->GetState(), State::NotScheduled);
587
588 auto daqs = m_manager.GetDaqControllers();
589 ASSERT_EQ(daqs.size(), 1u);
590 EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
591
592 // Simulate start and then completion emitting state changes in ObservableStatus.
593 // Although signal observers are called synchronously we still pump event loop in case
594 // any callbacks are scheduled.
595 {
596 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq1_status->GetId())).Times(1);
597 m_daq1_status->SetState(State::Merging);
598 m_io_ctx.restart();
599 m_io_ctx.poll();
600 }
601
602 {
603 EXPECT_CALL(*m_dpm_client, StopMonitorStatus(m_daq_ctx_1.id)).Times(1);
604 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id)).Times(1);
605
606 m_daq1_status->SetState(State::Completed);
607 m_io_ctx.restart();
608 m_io_ctx.poll();
609 EXPECT_FALSE(m_manager.HaveDaq(m_daq_ctx_1.id));
610 }
611}
612
613TEST_F(TestManagerImpl, RestoreFromWorkspaceAddsDaq) {
614 // Setup
615 // daq1 should create OCM daq controller
616 // daq2 should create DPM daq controller
617 //
618 // Set time to be not stale
619 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
620 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
621 m_daq1_status->SetState(State::Acquiring);
622 m_daq2_status->SetState(State::Merging);
623
624 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
625
626 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
627
628 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
629 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
630 .WillOnce(Return(m_daq1_status->GetStatus()));
631 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(0);
632
633 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
634 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
635 .WillOnce(Return(m_daq2_status->GetStatus()));
636 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id));
637
638 EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
639
640 // Test
641 m_manager.RestoreFromWorkspace();
642}
643
644TEST_F(TestManagerImpl, RestoreFromWorkspaceArchivesStaleDaqs) {
645 // Setup
646 // daq1 should be archived
647 // daq2 should create DPM daq controller
648 //
649 // Set time to be not stale
650 m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
651 m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
652
653 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
654
655 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
656
657 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
658 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
659 .WillOnce(Return(m_daq1_status->GetStatus()));
660 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
661
662 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
663 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
664 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
665 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
666
667 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
668
669 // Test
670 m_manager.RestoreFromWorkspace();
671}
672
673TEST_F(TestManagerImpl, RestoreFromWorkspaceSkipsMissingDaq) {
674 // Setup
675 // daq1 should be archived
676 // daq2 should create DPM daq controller
677 //
678 // Set time to be not stale
679 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
680 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
681
682 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
683 std::vector<std::string> to_store = {m_daq_ctx_2.id};
684
685 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
686
687 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
688 .WillOnce(Throw(std::runtime_error("ouch")));
689 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
690 .Times(Between(0, 1))
691 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
692 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
693 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
694
695 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
696 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
697 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
698 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
699
700 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
701
702 // Test
703 m_manager.RestoreFromWorkspace();
704}
705
706TEST_F(TestManagerImpl, RestoreFromWorkspaceIgnoresArchiveFailure) {
707 // Setup
708 // daq1 should be archived
709 // daq2 should create DPM daq controller
710 //
711 // Set time to be not stale
712 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
713 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
714 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
715 std::vector<std::string> to_store = {m_daq_ctx_2.id};
716
717 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
718
719 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
720 .WillOnce(Throw(std::runtime_error("ouch")));
721 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
722 .Times(Between(0, 1))
723 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
724 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
725 .WillOnce(Throw(std::runtime_error("ouch")));
726 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
727
728 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
729 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
730 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
731
732 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
733
734 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
735
736 // Test
737 m_manager.RestoreFromWorkspace();
738}
739
740TEST_F(TestManagerImpl, ContextSignalUpdatesWorkspace) {
741 // Setup
742 StartDaq1();
743
744 EXPECT_CALL(m_workspace, StoreContext(Field(&DaqContext::id, "daq1")));
745
746 // Run
747 m_daq1->signal(m_daq_ctx_1);
748}
Started operation was aborted.
Definition: error.hpp:48
Combined mock and fake of interface to DPM server.
Implements daq::Manager.
Definition: manager.hpp:264
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:554
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:498
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:615
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:93
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Indicates keyword is invalid for some reason.
Definition: keyword.hpp:534
Represents the literal 80-character FITS keyword record.
Definition: keyword.hpp:129
Indicates keyword is unknown and cannot be formatted.
Definition: keyword.hpp:543
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::DpmClient
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
DaqContext m_daq_ctx_1
void TearDown() override
Definition: testManager.cpp:62
ManagerImpl m_manager
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
std::string m_daq_id_2
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
Definition: testManager.cpp:88
void TearDown() override
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
void SetUp() override
Definition: testManager.cpp:60
std::string m_daq_id_1
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
DaqContext m_daq_ctx_2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
std::shared_ptr< DpmClientMock > m_dpm_client
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:44
Simple observer used for testing.
Contains data structure for FITS keywords.
Declaration of daq::Manager
constexpr KeywordNameView GetKeywordName(EsoKeyword const &keyword) noexcept
Get keyword name from keyword.
Definition: keyword.hpp:436
LiteralKeyword Format(KeywordVariant const &keyword)
Definition: keyword.cpp:782
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
std::variant< ValueKeyword, EsoKeyword, LiteralKeyword > KeywordVariant
The different variants of keywords that are supported.
Definition: keyword.hpp:409
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
Definition: keyword.hpp:339
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
Definition: keyword.hpp:346
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:56
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:24
bool HasError(Status const &status) noexcept
Definition: status.cpp:179
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Completed
Completed DAQ.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
Definition: manager.hpp:46
ManagerParams m_params
Definition: testManager.cpp:51
T ExpectNoException(boost::future< T > &f)
Definition: testManager.cpp:37
NiceMock< KeywordFormatterMock > m_kw_formatter
Definition: testManager.cpp:52
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:63
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:58
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > dpm_mocks
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Exception indicating the DAQ id was invalid.
Definition: manager.hpp:38
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
std::string id
Definition: status.hpp:184
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:165
MATCHER_P(KeywordNameEq, name, "keyword name equals")
Definition: testManager.cpp:29
TEST_F(TestManagerImplLifecycle, AddDaqNotifiesObserver)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Mock of DaqController.
Defines shared test utilities.