ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
scheduler.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_dpm
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daq::dpm::Scheduler and related class declarations.
9 */
10#ifndef DAQ_DPM_SCHEDULER_HPP
11#define DAQ_DPM_SCHEDULER_HPP
12#include <daq/config.hpp>
13
14#include <functional>
15#include <vector>
16
17#include <boost/signals2.hpp>
18#include <fmt/ostream.h>
19#include <log4cplus/logger.h>
20#include <rad/ioExecutor.hpp>
21
23#include <daq/json/dpSpec.hpp>
26#include <daq/resourceToken.hpp>
27#include <daq/status.hpp>
28
29namespace daq::dpm {
30
31class Workspace;
32class DaqWorkspace;
33struct Resources;
34
35/**
36 * Controller for specific DAQ.
37 *
38 * Responsibilities:
39 *
40 * State::Scheduled for each state:
41 * 1. Establish list of missing sources and create and store source mapping.
42 * 2. Transition to Tranferring when source mapping has been stored successfully.
43 *
44 * State::Collecting
45 * 1. Use source mapping from previous state and query workspace if source has been downloaded.
46 * 2. Transfer missing files.
47 * 3. Transition to Merging when all files have been successfully transferred.
48 *
49 * State::Merging
50 * 1. ...
51 * N. Transition to Releasing
52 *
53 * State::Releasing
54 * 1. ...
55 *
56 * State::Complete
57 * - Nothing
58 *
59 * DaqController cannot assume that all activities for each state is complete.
60 */
62public:
63 virtual ~DaqController() {
64 }
65 /**
66 * @returns data acquisition identifier.
67 */
68 virtual auto GetId() const noexcept -> std::string const& = 0;
69
70 /**
71 * @return Error flag.
72 */
73 virtual auto GetErrorFlag() const noexcept -> bool = 0;
74
75 /**
76 * @returns state of data acquisition.
77 */
78 virtual auto GetState() const noexcept -> State = 0;
79
80 /**
81 * @returns Data product filename.
82 */
83 virtual auto GetResult() const noexcept -> std::filesystem::path const& = 0;
84
85 /**
86 * @returns status object associated with this daq.
87 */
88 virtual auto GetStatus() noexcept -> ObservableStatus& = 0;
89 virtual auto GetStatus() const noexcept -> ObservableStatus const& = 0;
90
91 /**
92 * Start/stop operations.
93 *
94 * This assumes the implementation has an internal scheduler that it uses.
95 */
96 virtual void Start() = 0;
97 virtual void Stop() = 0;
98 virtual void Poll() = 0;
99 virtual bool IsStopped() const noexcept = 0;
100};
101
102std::ostream& operator<<(std::ostream& os, DaqController const& daq);
103
104/**
105 * Imposes limits on how many concurrent operations are allowed.
106 */
108 /**
109 * Limits how many DAQs overall can be scheduled concurrently.
110 */
111 unsigned short daq = 1;
112
113 /**
114 * Maximum number of concurrent output transfers.
115 */
116 unsigned short net_send = 0;
117
118 /**
119 * Maximum number of concurrent input transfers = 0.
120 */
121 unsigned short net_receive = 0;
122
123 /**
124 * Maximum number of concurrent merge processes.
125 */
126 unsigned short merge = 1;
127};
128
129/**
130 * Options controlling scheduler operations.
131 *
132 * Limits with value `0` is unlimited.
133 */
135 /**
136 * Imposes limits on how many concurrent operations are allowed.
137 */
139 /**
140 * Limits how many DAQs overall can be scheduled concurrently.
141 */
142 unsigned short daq = 1;
143
144 /**
145 * Maximum number of concurrent output transfers.
146 */
147 unsigned short net_send = 0;
148
149 /**
150 * Maximum number of concurrent input transfers = 0.
151 */
152 unsigned short net_receive = 0;
153
154 /**
155 * Maximum number of concurrent transfers
156 */
157 unsigned short merge = 0;
158 } concurrency_limits;
159};
160
161/**
162 * Options for DaqController
163 */
165 std::string merge_bin = "daqDpmMerge";
166 std::string rsync_bin = "rsync";
167};
168
169/**
170 * Limited resources
171 */
172struct Resources {
177};
178
180 boost::signals2::scoped_connection daqs;
181 boost::signals2::scoped_connection net_send;
182 boost::signals2::scoped_connection net_receive;
183 boost::signals2::scoped_connection merge;
184};
185
186/**
187 * Schedules asynchronous activities that results in merged Data Product and delivery.
188 *
189 * Internally it maintains a prioritized queue of DAQs that should be merged.
190 *
191 * Main responsibilities:
192 *
193 * - Maintain DAQ queue in priority order.
194 * - Process each queued DAQ in order (possibly with configurable parallelization):
195 * - Ensure* input source files are transferred.
196 * - Ensure* DAQ is eventually merged to create DP.
197 * - Update DAQ state to reflect the state it is in.
198 * - State persistence
199 * - Store state to workspace and
200 * - Load state to recover at startup.
201 *
202 * Outside the scope of Scheduler:
203 *
204 * - Prerequisite for DAQ is that the Data Product Specification is final and can be considered
205 * "read-only" within the Scheduler. This means that e.g. the input file names have been
206 * determined.
207 *
208 * TBD/notes:
209 * - Workspace management (should be same component responsible for):
210 * - Who finalizes DP Specification (local filenames)?
211 * Local file names are missing and needs to be determined (ensuring that files do not collide).
212 * - Who initializes workspace with new DAQs? It should be the entity that decides about file
213 * names (how about daq::dpm::Workspace?).
214 * - Who archives DAQs?
215 * - Only DAQs that are actively being worked on needs to be represented in Scheduler. The remaining
216 * backlog could just be entries in a file `<workspace>/queue.json`.
217 * - This would imply that Scheduler is at least responsible for:
218 * - Maintaining `queue.json`
219 * - Interface use strings rather than fat objects.
220 * - It creates fat objects
221 */
223public:
224 virtual ~Scheduler() {
225 }
226
227 /**
228 * Start/stop operations.
229 *
230 * This assumes the implementation has an internal scheduler that it uses.
231 */
232 virtual void Start() = 0;
233 virtual void Stop() = 0;
234
235 /**
236 * Queues DAQ for processing.
237 *
238 * @note Scheduler is responsible for creating unique local names for input files.
239 *
240 * @param dp_spec JSON encoded Data Product Specification. If parsing fails a
241 * std::invalid_argument will be thrown.
242 * @param status JSON encoded daq::Status. May be an empty string in which case a default
243 * initial status will be used.
244 * @returns DAQ id on success.
245 * @throw std::invalid_argument if dp_spec is invalid or is already queued.
246 */
247 virtual std::string QueueDaq(std::string const& dp_spec, std::string const& status) = 0;
248
249 /**
250 * Abort merging DAQ identified by @c id.
251 *
252 * @note Workspace related to this will be purged.
253 *
254 * @param id DAQ id.
255 *
256 * @throw std::invalid_argument if DAQ is unknown.
257 */
258 virtual void AbortDaq(std::string const& id) = 0;
259
260 /**
261 * Queries if DAQ with ID has been queued before in the current workspace.
262 *
263 * @param id DAQ id.
264 * @return true if DAQ is in the merge queue, false otherwise.
265 * @throw std::invalid_argument if DAQ is not known.
266 */
267 virtual bool IsQueued(std::string const& id) const noexcept = 0;
268
269 /**
270 * Queries current DAQ status, possibly from last recorded status in workspace.
271 *
272 * @param id DAQ id.
273 * @return Merge status of @a id.
274 * @throw std::invalid_argument if DAQ is not known.
275 */
276 virtual Status GetDaqStatus(std::string const& id) const = 0;
277
278 /**
279 * Queries current DAQ queue.
280 *
281 * @return list of DAQs pending or already started to be merged.
282 */
283 virtual std::vector<std::string> GetQueue() const noexcept = 0;
284
285 /**
286 * Signals
287 */
288 /// @{
289 using StatusSignal = boost::signals2::signal<void(Status const&)>;
290 virtual boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) = 0;
291
292 /// @}
293};
294
295class SchedulerImpl : public Scheduler {
296public:
298 std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>, Resources&)>;
299
300 /**
301 * Constructs a scheduler loading information from workspace @a ws.
302 *
303 * The scheduler will load the stored list of queued DAQs and schedule operations to be
304 * performed in priority order.
305 */
307 Workspace& workspace,
308 DaqControllerFactory daq_controller_factory,
309 SchedulerOptions const& options);
310
311 std::string QueueDaq(std::string const& dp_spec, std::string const& status) override;
312 void AbortDaq(std::string const&) override;
313 bool IsQueued(std::string const& id) const noexcept override;
314 Status GetDaqStatus(std::string const& id) const override;
315 std::vector<std::string> GetQueue() const noexcept override;
316
317 /**
318 * @name Signals.
319 *
320 * - DAQ Status
321 */
322 /// @{
323 boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) override;
324 /// @}
325 void Start() override;
326 void Stop() override;
327
328private:
329 /**
330 * Polls for possible activitites to start.
331 *
332 * This is the core of the scheduler which will initiate asynchronus activities.
333 *
334 * This should be invoked as external events happen:
335 * - Previously started activities complete (which may allow new activities to start).
336 * - Modifiers invoked (QueueDaq/AbortDaq)
337 * - Limits changed.
338 *
339 * Activities are only started if limits allow them, in DAQ priority order.
340 *
341 * List of activities:
342 *
343 * - Initiate active DAQ.
344 * - State::Collecting
345 * - Transfer all files (identify missing sources and start transfer).
346 * - State::Merge:
347 * - Merge to create Data Product
348 * - State::Releasing:
349 * - Release finished DP to receivers.
350 *
351 * Updates m_active with new DAQs if limits and queue allows.
352 * If active daq is Scheduled it will first be transitioned to Tranferring.
353 */
354 void Poll();
355
356 /**
357 * Checks queue for DAQs to start merging, limited on available resources.
358 */
359 void ActivateFromQueue();
360
361 /**
362 * Checks m_active for completed daqs and if completed:
363 * - Erase from m_active
364 * - Erase from queue
365 * - Archive workspace
366 */
367 void ArchiveCompleted();
368
369 /**
370 * Schedules Poll() to be executed at a later time.
371 *
372 * @thread_safe
373 */
374 void DeferredPoll();
375
376 /**
377 * Get candidates for merging (set of DAQs from backlog/queue not in m_active.
378 */
379 std::vector<std::string> GetCandidates() const;
380
381 struct Active {
382 Active(std::unique_ptr<DaqController> daq_arg, ResourceToken token_arg)
383 : daq(std::move(daq_arg)), token(std::move(token_arg)) {
384 assert(daq);
385 }
386 std::unique_ptr<DaqController> daq;
387 ResourceToken token;
388 };
389
390 rad::IoExecutor& m_executor;
391 Workspace& m_workspace;
392 DaqControllerFactory m_daq_controller_factory;
393 SchedulerOptions m_options;
394
395 // Must be ordered before m_active
396 Resources m_resources;
397 ResourcesConnections m_resources_connections;
398
399 /**
400 * Active (transferring files, merging or delivering result) DAQs in priority order.
401 *
402 * Operations for active DAQs are concurrent. There's no synchronization between DAQs.
403 *
404 * Number of active DAQs is limited by m_resources.daq.
405 */
406 std::vector<Active> m_active;
407
408 /**
409 * Container of all DAQs that are currently being processed or is simply in the backlog/queue.
410 * This must be kept synchronized with file system so that it can be relied on for answering
411 * questions like "have this DAQ been queued before".
412 */
413 std::vector<std::string> m_queue;
414
415 StatusSignal m_status_signal;
416 log4cplus::Logger m_logger;
417 /**
418 * A shared pointer value which if valid indicates that Scheduler is alive.
419 * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
420 */
421 std::shared_ptr<bool> m_liveness;
422
423 /**
424 * Indicates if Scheduler is stopped -> shouldn't poll.
425 */
426 bool m_stopped = true;
427};
428
429/**
430 * Internal data structure to SchedulerImpl
431 */
433public:
435 std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
436 std::string const&, // source
437 std::string const&, // dest
438 RsyncOptions const&,
440
441 using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
442 boost::asio::io_context&, std::vector<std::string> const&)>;
443 /**
444 * Construct controller for existing workspace.
445 * @post State is loaded from @a workspace.
446 *
447 * @throws std-exception (possibly nested) containing error.
448 */
450 std::unique_ptr<DaqWorkspace> workspace,
451 Resources& resources,
452 RsyncFactory rsync_factory,
453 ProcFactory proc_factory,
455
456 void Start() override;
457 void Stop() override;
458 auto IsStopped() const noexcept -> bool override;
459
460 /**
461 * @returns data acquisition identifier.
462 */
463 auto GetId() const noexcept -> std::string const& override;
464
465 /**
466 * @return Error flag.
467 */
468 auto GetErrorFlag() const noexcept -> bool override;
469
470 /**
471 * @returns state of data acquisition.
472 */
473 auto GetState() const noexcept -> State override;
474
475 auto GetResult() const noexcept -> std::filesystem::path const& override {
476 return m_result;
477 }
478
479 /**
480 * @returns status object associated with this daq.
481 */
482 auto GetStatus() noexcept -> ObservableStatus& override;
483 auto GetStatus() const noexcept -> ObservableStatus const& override;
484 void Poll() override;
485
486private:
487 using SourceFile = SourceResolver::SourceFile;
488 void DeferredPoll();
489 struct Scheduled {};
490 struct Collecting {
491 struct Transfer {
493 std::filesystem::path local_path;
494 std::shared_ptr<RsyncAsyncProcessIf> proc;
496 };
497 Collecting(SourceResolver resolver);
498 Collecting(Collecting&&) = default;
499 Collecting& operator=(Collecting&&) = default;
500 /**
501 * Aborts any in-progress transfers.
502 */
503 ~Collecting() noexcept;
504 bool HasTransfer(SourceFile const&) const noexcept;
505 Transfer* GetTransfer(SourceFile const&) noexcept;
506 void EraseTransfer(SourceFile const&);
507
508 SourceResolver resolver;
509 std::vector<Transfer> transfers;
510 };
511 struct Merging {
512 Merging() = default;
513 Merging(Merging&&) = default;
514 Merging& operator=(Merging&&) = default;
515 ~Merging() noexcept;
516 void Reset();
517 std::shared_ptr<AsyncProcessIf> merger = {};
518 std::optional<ResourceToken> token = std::nullopt;
519 };
520 /**
521 * In state releasing we transfer the data product result (m_result) to configured receivers.
522 *
523 * Failures are not treated as fatal.
524 */
525 struct Releasing {
526 Releasing(Releasing&&) = default;
527 Releasing& operator=(Releasing&&) = default;
528 /**
529 * Aborts any in-progress transfers.
530 */
531 ~Releasing() noexcept;
532 /**
533 * Rsync transfer of Data Product to a receiver
534 */
535 struct Transfer {
536 /**
537 * Transfer
538 */
539 std::shared_ptr<RsyncAsyncProcessIf> proc;
540 /**
541 * Token held to implement resource limits.
542 */
544 };
545
546 /**
547 * Active rsync receiver transfers.
548 *
549 * @note that this state is not stored persistently but rather daq::Status should contain
550 * whether a transfer was completed successfully so it can be skipped if DPM is restarted.
551 */
552 std::map<std::size_t, Transfer> transfers;
553 };
554 struct Completed {};
555
556 // Poll methods are allowed to perform state transitions.
557 void Poll(Scheduled&);
558 void Poll(Collecting&);
559 void TransferComplete(SourceFile const& source,
560 std::filesystem::path const& local_path,
561 boost::future<int> result,
562 LogCaptureLast const& log) noexcept;
563
564 void Poll(Merging&);
565 void MergeComplete(boost::future<int> result) noexcept;
566
567 void Poll(Releasing&);
568 /**
569 * Try to start a new release transfer, constrained by resources and soft-stop condition.
570 * @return true if started.
571 * @return false if resource limits were hit.
572 */
573 bool TryStartRelease(Releasing& ctx, json::ReceiverTypes const& receiver, std::size_t index);
574 bool TryStartRelease(Releasing& ctx, json::OlasReceiver const& receiver, std::size_t index);
575
576 /**
577 * Completion handler for a Release Transfer.
578 *
579 * On Success: Update status with success.
580 * On Failure: Update status with failure and increment try count.
581 *
582 * Finally make a deferred poll to schedule new transfers or transition to Complete.
583 */
584 void ReleaseComplete(std::size_t index,
585 AsyncProcessIf const& proc,
586 boost::future<int> result,
587 LogCaptureLast const& log) noexcept;
588
589 void Poll(Completed&);
590
591 /**
592 * Handle JSON message from merger
593 */
594 void HandleMergeMessage(std::string const& line) noexcept;
595
596 using StateVariant = std::variant<Scheduled, Collecting, Merging, Releasing, Completed>;
597 static State MakeState(StateVariant const&);
598 /**
599 * Updates m_state and m_status and is the normal way m_state/m_status is updated (except
600 * during construction).
601 */
602 void SetState(StateVariant s);
603
604 /**
605 * Get archive filename (the filename stem only).
606 */
607 std::string GetArchiveFilename() const;
608
609 /**
610 * State persistence
611 */
612 rad::IoExecutor& m_executor;
613 std::unique_ptr<DaqWorkspace> m_workspace;
614 Resources& m_resources;
615 RsyncFactory m_rsync_factory;
616 ProcFactory m_proc_factory;
617 DaqControllerOptions m_options;
618
619 json::DpSpec const m_dpspec;
620 /**
621 * Path to resulting data product (this may contain custom prefixes and is not compatible with
622 * OLAS)
623 */
624 std::filesystem::path m_result;
625
626 StateVariant m_state_ctx;
627
628 /**
629 * Current in-memory status.
630 */
631 ObservableStatus m_status;
632 boost::signals2::scoped_connection m_status_connection;
633
634 /**
635 * A shared pointer value which if valid indicates that Scheduler is alive.
636 * The value itself indicates whether a deferred Poll() has been scheduled but not yet
637 * executed.
638 */
639 std::shared_ptr<bool> m_liveness;
640 bool m_stopped;
641 /**
642 * If true no new async activities should be started. This is mainly used to avoid stopping
643 * abruptly when an error occurs and let already started activities complete.
644 */
645 bool m_soft_stop = false;
646 log4cplus::Logger m_logger;
647};
648
649} // namespace daq::dpm
650
651template <typename T, typename Char>
652struct fmt::
653 formatter<T, Char, std::enable_if_t<std::is_convertible_v<T*, daq::dpm::DaqController*>>>
654 : ostream_formatter {};
655
656#endif // #ifndef DAQ_DPM_SCHEDULER_HPP
daq::AsyncProcess class definition
Interface to asynchronous process.
Abstract factory for DaqControllers.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
Observes any status.
Definition: manager.hpp:88
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
Internal data structure to SchedulerImpl.
Definition: scheduler.hpp:432
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:442
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:439
Controller for specific DAQ.
Definition: scheduler.hpp:61
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Stop()=0
virtual void Poll()=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:298
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:222
virtual ~Scheduler()
Definition: scheduler.hpp:224
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual std::string QueueDaq(std::string const &dp_spec, std::string const &status)=0
Queues DAQ for processing.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Definition: scheduler.hpp:289
virtual void Stop()=0
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:133
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::signals2::scoped_connection net_receive
Definition: scheduler.hpp:182
boost::signals2::scoped_connection daqs
Definition: scheduler.hpp:180
boost::signals2::scoped_connection merge
Definition: scheduler.hpp:183
boost::signals2::scoped_connection net_send
Definition: scheduler.hpp:181
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:107
Options for DaqController.
Definition: scheduler.hpp:164
Limited resources.
Definition: scheduler.hpp:172
Options controlling scheduler operations.
Definition: scheduler.hpp:134
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:138
std::variant< OlasReceiver > ReceiverTypes
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:77
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ Completed
Completed DAQ.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
std::shared_ptr< RsyncAsyncProcessIf > proc
Definition: scheduler.hpp:494
Rsync transfer of Data Product to a receiver.
Definition: scheduler.hpp:535
std::shared_ptr< RsyncAsyncProcessIf > proc
Transfer.
Definition: scheduler.hpp:539
ResourceToken token
Token held to implement resource limits.
Definition: scheduler.hpp:543