ifw-daq  3.0.1
IFW Data Acquisition modules
scheduler.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler and related class declarations.
9  */
10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
12 #include <daq/config.hpp>
13 
14 #include <functional>
15 #include <vector>
16 
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
22 #include <daq/json/dpSpec.hpp>
25 #include <daq/resourceToken.hpp>
26 #include <daq/status.hpp>
27 
28 namespace daq::dpm {
29 
30 class Workspace;
31 class DaqWorkspace;
32 struct Resources;
33 
34 /**
35  * Controller for specific DAQ.
36  *
37  * Responsibilities:
38  *
39  * State::Scheduled for each state:
40  * 1. Establish list of missing sources and create and store source mapping.
41  * 2. Transition to Tranferring when source mapping has been stored successfully.
42  *
43  * State::Collecting
44  * 1. Use source mapping from previous state and query workspace if source has been downloaded.
45  * 2. Transfer missing files.
46  * 3. Transition to Merging when all files have been successfully transferred.
47  *
48  * State::Merging
49  * 1. ...
50  * N. Transition to Releasing
51  *
52  * State::Releasing
53  * 1. ...
54  *
55  * State::Complete
56  * - Nothing
57  *
58  * DaqController cannot assume that all activities for each state is complete.
59  */
61 public:
62  virtual ~DaqController() {
63  }
64  /**
65  * @returns data acquisition identifier.
66  */
67  virtual auto GetId() const noexcept -> std::string const& = 0;
68 
69  /**
70  * @return Error flag.
71  */
72  virtual auto GetErrorFlag() const noexcept -> bool = 0;
73 
74  /**
75  * @returns state of data acquisition.
76  */
77  virtual auto GetState() const noexcept -> State = 0;
78 
79  /**
80  * @returns Data product filename.
81  */
82  virtual auto GetResult() const noexcept -> std::filesystem::path const& = 0;
83 
84  /**
85  * @returns status object associated with this daq.
86  */
87  virtual auto GetStatus() noexcept -> ObservableStatus& = 0;
88  virtual auto GetStatus() const noexcept -> ObservableStatus const& = 0;
89 
90  /**
91  * Start/stop operations.
92  *
93  * This assumes the implementation has an internal scheduler that it uses.
94  */
95  virtual void Start() = 0;
96  virtual void Stop() = 0;
97  virtual void Poll() = 0;
98  virtual bool IsStopped() const noexcept = 0;
99 };
100 
101 std::ostream& operator<<(std::ostream& os, DaqController const& daq);
102 
103 /**
104  * Imposes limits on how many concurrent operations are allowed.
105  */
107  /**
108  * Limits how many DAQs overall can be scheduled concurrently.
109  */
110  unsigned short daq = 1;
111 
112  /**
113  * Maximum number of concurrent output transfers.
114  */
115  unsigned short net_send = 0;
116 
117  /**
118  * Maximum number of concurrent input transfers = 0.
119  */
120  unsigned short net_receive = 0;
121 
122  /**
123  * Maximum number of concurrent merge processes.
124  */
125  unsigned short merge = 1;
126 };
127 
128 /**
129  * Options controlling scheduler operations.
130  *
131  * Limits with value `0` is unlimited.
132  */
134  /**
135  * Imposes limits on how many concurrent operations are allowed.
136  */
138  /**
139  * Limits how many DAQs overall can be scheduled concurrently.
140  */
141  unsigned short daq = 1;
142 
143  /**
144  * Maximum number of concurrent output transfers.
145  */
146  unsigned short net_send = 0;
147 
148  /**
149  * Maximum number of concurrent input transfers = 0.
150  */
151  unsigned short net_receive = 0;
152 
153  /**
154  * Maximum number of concurrent transfers
155  */
156  unsigned short merge = 0;
157  } concurrency_limits;
158 };
159 
160 /**
161  * Options for DaqController
162  */
164  std::string merge_bin = "daqDpmMerge";
165  std::string rsync_bin = "rsync";
166 };
167 
168 /**
169  * Limited resources
170  */
171 struct Resources {
176 };
177 
179  boost::signals2::scoped_connection daqs;
180  boost::signals2::scoped_connection net_send;
181  boost::signals2::scoped_connection net_receive;
182  boost::signals2::scoped_connection merge;
183 };
184 
185 /**
186  * Schedules asynchronous activities that results in merged Data Product and delivery.
187  *
188  * Internally it maintains a prioritized queue of DAQs that should be merged.
189  *
190  * Main responsibilities:
191  *
192  * - Maintain DAQ queue in priority order.
193  * - Process each queued DAQ in order (possibly with configurable parallelization):
194  * - Ensure* input source files are transferred.
195  * - Ensure* DAQ is eventually merged to create DP.
196  * - Update DAQ state to reflect the state it is in.
197  * - State persistence
198  * - Store state to workspace and
199  * - Load state to recover at startup.
200  *
201  * Outside the scope of Scheduler:
202  *
203  * - Prerequisite for DAQ is that the Data Product Specification is final and can be considered
204  * "read-only" within the Scheduler. This means that e.g. the input file names have been
205  * determined.
206  *
207  * TBD/notes:
208  * - Workspace management (should be same component responsible for):
209  * - Who finalizes DP Specification (local filenames)?
210  * Local file names are missing and needs to be determined (ensuring that files do not collide).
211  * - Who initializes workspace with new DAQs? It should be the entity that decides about file
212  * names (how about daq::dpm::Workspace?).
213  * - Who archives DAQs?
214  * - Only DAQs that are actively being worked on needs to be represented in Scheduler. The remaining
215  * backlog could just be entries in a file `<workspace>/queue.json`.
216  * - This would imply that Scheduler is at least responsible for:
217  * - Maintaining `queue.json`
218  * - Interface use strings rather than fat objects.
219  * - It creates fat objects
220  */
221 class Scheduler {
222 public:
223  virtual ~Scheduler() {
224  }
225 
226  /**
227  * Start/stop operations.
228  *
229  * This assumes the implementation has an internal scheduler that it uses.
230  */
231  virtual void Start() = 0;
232  virtual void Stop() = 0;
233 
234  /**
235  * Queues DAQ for processing.
236  *
237  * @note Scheduler is responsible for creating unique local names for input files.
238  *
239  * @param dp_spec JSON encoded Data Product Specification. If parsing fails a
240  * std::invalid_argument will be thrown.
241  *
242  * @returns DAQ id on success.
243  * @throw std::invalid_argument if dp_spec is invalid or is already queued.
244  */
245  virtual std::string QueueDaq(std::string const& dp_spec) = 0;
246 
247  /**
248  * Abort merging DAQ identified by @c id.
249  *
250  * @note Workspace related to this will be purged.
251  *
252  * @param id DAQ id.
253  *
254  * @throw std::invalid_argument if DAQ is unknown.
255  */
256  virtual void AbortDaq(std::string const& id) = 0;
257 
258  /**
259  * Queries if DAQ with ID has been queued before in the current workspace.
260  *
261  * @param id DAQ id.
262  * @return true if DAQ is in the merge queue, false otherwise.
263  * @throw std::invalid_argument if DAQ is not known.
264  */
265  virtual bool IsQueued(std::string const& id) const noexcept = 0;
266 
267  /**
268  * Queries current DAQ status, possibly from last recorded status in workspace.
269  *
270  * @param id DAQ id.
271  * @return Merge status of @a id.
272  * @throw std::invalid_argument if DAQ is not known.
273  */
274  virtual Status GetDaqStatus(std::string const& id) const = 0;
275 
276  /**
277  * Queries current DAQ queue.
278  *
279  * @return list of DAQs pending or already started to be merged.
280  */
281  virtual std::vector<std::string> GetQueue() const noexcept = 0;
282 
283  /**
284  * Signals
285  */
286  /// @{
287  using StatusSignal = boost::signals2::signal<void(Status const&)>;
288  virtual boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) = 0;
289 
290  /// @}
291 };
292 
293 class SchedulerImpl : public Scheduler {
294 public:
296  std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>, Resources&)>;
297 
298  /**
299  * Constructs a scheduler loading information from workspace @a ws.
300  *
301  * The scheduler will load the stored list of queued DAQs and schedule operations to be
302  * performed in priority order.
303  */
304  SchedulerImpl(rad::IoExecutor& executor,
305  Workspace& workspace,
306  DaqControllerFactory daq_controller_factory,
307  SchedulerOptions const& options);
308 
309  std::string QueueDaq(std::string const& dp_spec) override;
310  void AbortDaq(std::string const&) override;
311  bool IsQueued(std::string const& id) const noexcept override;
312  Status GetDaqStatus(std::string const& id) const override;
313  std::vector<std::string> GetQueue() const noexcept override;
314 
315  /**
316  * @name Signals.
317  *
318  * - DAQ Status
319  */
320  /// @{
321  boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) override;
322  /// @}
323  void Start() override;
324  void Stop() override;
325 
326 private:
327  /**
328  * Polls for possible activitites to start.
329  *
330  * This is the core of the scheduler which will initiate asynchronus activities.
331  *
332  * This should be invoked as external events happen:
333  * - Previously started activities complete (which may allow new activities to start).
334  * - Modifiers invoked (QueueDaq/AbortDaq)
335  * - Limits changed.
336  *
337  * Activities are only started if limits allow them, in DAQ priority order.
338  *
339  * List of activities:
340  *
341  * - Initiate active DAQ.
342  * - State::Collecting
343  * - Transfer all files (identify missing sources and start transfer).
344  * - State::Merge:
345  * - Merge to create Data Product
346  * - State::Releasing:
347  * - Release finished DP to receivers.
348  *
349  * Updates m_active with new DAQs if limits and queue allows.
350  * If active daq is Scheduled it will first be transitioned to Tranferring.
351  */
352  void Poll();
353 
354  /**
355  * Checks queue for DAQs to start merging, limited on available resources.
356  */
357  void ActivateFromQueue();
358 
359  /**
360  * Checks m_active for completed daqs and if completed:
361  * - Erase from m_active
362  * - Erase from queue
363  * - Archive workspace
364  */
365  void ArchiveCompleted();
366 
367  /**
368  * Schedules Poll() to be executed at a later time.
369  *
370  * @thread_safe
371  */
372  void DeferredPoll();
373 
374 
375  /**
376  * Get candidates for merging (set of DAQs from backlog/queue not in m_active.
377  */
378  std::vector<std::string> GetCandidates() const;
379 
380  struct Active {
381  Active(std::unique_ptr<DaqController> daq_arg, ResourceToken token_arg)
382  : daq(std::move(daq_arg)), token(std::move(token_arg)) {
383  assert(daq);
384  }
385  std::unique_ptr<DaqController> daq;
386  ResourceToken token;
387  };
388 
389  rad::IoExecutor& m_executor;
390  Workspace& m_workspace;
391  DaqControllerFactory m_daq_controller_factory;
392  SchedulerOptions m_options;
393 
394  // Must be ordered before m_active
395  Resources m_resources;
396  ResourcesConnections m_resources_connections;
397 
398  /**
399  * Active (transferring files, merging or delivering result) DAQs in priority order.
400  *
401  * Operations for active DAQs are concurrent. There's no synchronization between DAQs.
402  *
403  * Number of active DAQs is limited by m_resources.daq.
404  */
405  std::vector<Active> m_active;
406 
407  /**
408  * Container of all DAQs that are currently being processed or is simply in the backlog/queue.
409  * This must be kept synchronized with file system so that it can be relied on for answering
410  * questions like "have this DAQ been queued before".
411  */
412  std::vector<std::string> m_queue;
413 
414 
415 
416  StatusSignal m_status_signal;
417  log4cplus::Logger m_logger;
418  /**
419  * A shared pointer value which if valid indicates that Scheduler is alive.
420  * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
421  */
422  std::shared_ptr<bool> m_liveness;
423 
424  /**
425  * Indicates if Scheduler is stopped -> shouldn't poll.
426  */
427  bool m_stopped = true;
428 };
429 
430 /**
431  * Internal data structure to SchedulerImpl
432  */
434 public:
435  using RsyncFactory =
436  std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
437  std::string const&, // source
438  std::string const&, // dest
439  RsyncOptions const&,
441 
442  using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
443  boost::asio::io_context&, std::vector<std::string> const&)>;
444  /**
445  * Construct controller for existing workspace.
446  * @post State is loaded from @a workspace.
447  *
448  * @throws std-exception (possibly nested) containing error.
449  */
451  std::unique_ptr<DaqWorkspace> workspace,
452  Resources& resources,
453  RsyncFactory rsync_factory,
454  ProcFactory proc_factory,
455  DaqControllerOptions opts);
456 
457  void Start() override;
458  void Stop() override;
459  auto IsStopped() const noexcept -> bool override;
460 
461  /**
462  * @returns data acquisition identifier.
463  */
464  auto GetId() const noexcept -> std::string const& override;
465 
466  /**
467  * @return Error flag.
468  */
469  auto GetErrorFlag() const noexcept -> bool override;
470 
471  /**
472  * @returns state of data acquisition.
473  */
474  auto GetState() const noexcept -> State override;
475 
476  auto GetResult() const noexcept -> std::filesystem::path const& override {
477  return m_result;
478  }
479 
480  /**
481  * @returns status object associated with this daq.
482  */
483  auto GetStatus() noexcept -> ObservableStatus& override;
484  auto GetStatus() const noexcept -> ObservableStatus const& override;
485  void Poll() override;
486 
487 private:
488  using SourceFile = SourceResolver::SourceFile;
489  void DeferredPoll();
490  struct Scheduled {};
491  struct Collecting {
492  struct Transfer {
494  std::filesystem::path local_path;
495  std::shared_ptr<RsyncAsyncProcessIf> proc;
497  };
498  Collecting(SourceResolver resolver);
499  Collecting(Collecting&&) = default;
500  Collecting& operator=(Collecting&&) = default;
501  /**
502  * Aborts any in-progress transfers.
503  */
504  ~Collecting() noexcept;
505  bool HasTransfer(SourceFile const&) const noexcept;
506  Transfer* GetTransfer(SourceFile const&) noexcept;
507  void EraseTransfer(SourceFile const&);
508 
509  SourceResolver resolver;
510  std::vector<Transfer> transfers;
511  };
512  struct Merging {
513  Merging() = default;
514  Merging(Merging&&) = default;
515  Merging& operator=(Merging&&) = default;
516  ~Merging() noexcept;
517  void Reset();
518  std::shared_ptr<AsyncProcessIf> merger = {};
519  std::optional<ResourceToken> token = std::nullopt;
520  };
521  /**
522  * In state releasing we transfer the data product result (m_result) to configured receivers.
523  *
524  * Failures are not treated as fatal.
525  */
526  struct Releasing {
527  Releasing(Releasing&&) = default;
528  Releasing& operator=(Releasing&&) = default;
529  /**
530  * Aborts any in-progress transfers.
531  */
532  ~Releasing() noexcept;
533  /**
534  * Rsync transfer of Data Product to a receiver
535  */
536  struct Transfer {
537  /**
538  * Transfer
539  */
540  std::shared_ptr<RsyncAsyncProcessIf> proc;
541  /**
542  * Token held to implement resource limits.
543  */
545  };
546 
547  /**
548  * Active rsync receiver transfers.
549  *
550  * @note that this state is not stored persistently but rather daq::Status should contain
551  * whether a transfer was completed successfully so it can be skipped if DPM is restarted.
552  */
553  std::map<std::size_t, Transfer> transfers;
554  };
555  struct Completed {};
556 
557  // Poll methods are allowed to perform state transitions.
558  void Poll(Scheduled&);
559  void Poll(Collecting&);
560  void TransferComplete(SourceFile const& source,
561  std::filesystem::path const& local_path,
562  boost::future<int> result,
563  LogCaptureLast const& log) noexcept;
564 
565  void Poll(Merging&);
566  void MergeComplete(boost::future<int> result) noexcept;
567 
568  void Poll(Releasing&);
569  /**
570  * Try to start a new release transfer, constrained by resources and soft-stop condition.
571  * @return true if started.
572  * @return false if resource limits were hit.
573  */
574  bool TryStartRelease(Releasing& ctx, json::ReceiverTypes const& receiver, std::size_t index);
575  bool TryStartRelease(Releasing& ctx, json::OlasReceiver const& receiver, std::size_t index);
576 
577  /**
578  * Completion handler for a Release Transfer.
579  *
580  * On Success: Update status with success.
581  * On Failure: Update status with failure and increment try count.
582  *
583  * Finally make a deferred poll to schedule new transfers or transition to Complete.
584  */
585  void ReleaseComplete(std::size_t index,
586  AsyncProcessIf const& proc,
587  boost::future<int> result,
588  LogCaptureLast const& log) noexcept;
589 
590  void Poll(Completed&);
591 
592  /**
593  * Handle JSON message from merger
594  */
595  void HandleMergeMessage(std::string const& line) noexcept;
596 
597  using StateVariant = std::variant<Scheduled, Collecting, Merging, Releasing, Completed>;
598  static State MakeState(StateVariant const&);
599  /**
600  * Updates m_state and m_status and is the normal way m_state/m_status is updated (except
601  * during construction).
602  */
603  void SetState(StateVariant s, bool error = false);
604  void SetError(bool error);
605 
606  /**
607  * Get archive filename (the filename stem only).
608  */
609  std::string GetArchiveFilename() const;
610 
611  /**
612  * State persistence
613  */
614  rad::IoExecutor& m_executor;
615  std::unique_ptr<DaqWorkspace> m_workspace;
616  Resources& m_resources;
617  RsyncFactory m_rsync_factory;
618  ProcFactory m_proc_factory;
619  DaqControllerOptions m_options;
620 
621  json::DpSpec const m_dpspec;
622  /**
623  * Path to resulting data product (this may contain custom prefixes and is not compatible with
624  * OLAS)
625  */
626  std::filesystem::path m_result;
627 
628  StateVariant m_state_ctx;
629 
630  /**
631  * Current in-memory status.
632  */
633  ObservableStatus m_status;
634  boost::signals2::scoped_connection m_status_connection;
635 
636  /**
637  * A shared pointer value which if valid indicates that Scheduler is alive.
638  * The value itself indicates whether a deferred Poll() has been scheduled but not yet
639  * executed.
640  */
641  std::shared_ptr<bool> m_liveness;
642  bool m_stopped;
643  /**
644  * If true no new async activities should be started. This is mainly used to avoid stopping
645  * abruptly when an error occurs and let already started activities complete.
646  */
647  bool m_soft_stop = false;
648  log4cplus::Logger m_logger;
649 };
650 
651 } // namespace daq::dpm
652 
653 #endif // #ifndef DAQ_DPM_SCHEDULER_HPP
daq::AsyncProcess class definition
Interface to asynchronous process.
Abstract factory for DaqControllers.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
Observes any status.
Definition: manager.hpp:78
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
Internal data structure to SchedulerImpl.
Definition: scheduler.hpp:433
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:443
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:440
Controller for specific DAQ.
Definition: scheduler.hpp:60
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Stop()=0
virtual void Poll()=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:296
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:221
virtual ~Scheduler()
Definition: scheduler.hpp:223
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Definition: scheduler.hpp:287
virtual void Stop()=0
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:129
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::signals2::scoped_connection net_receive
Definition: scheduler.hpp:181
boost::signals2::scoped_connection daqs
Definition: scheduler.hpp:179
boost::signals2::scoped_connection merge
Definition: scheduler.hpp:182
boost::signals2::scoped_connection net_send
Definition: scheduler.hpp:180
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:106
Options for DaqController.
Definition: scheduler.hpp:163
Limited resources.
Definition: scheduler.hpp:171
Options controlling scheduler operations.
Definition: scheduler.hpp:133
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:137
std::variant< OlasReceiver > ReceiverTypes
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Completed
Completed DAQ.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
Definition: main.cpp:23
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
std::shared_ptr< RsyncAsyncProcessIf > proc
Definition: scheduler.hpp:495
Rsync transfer of Data Product to a receiver.
Definition: scheduler.hpp:536
std::shared_ptr< RsyncAsyncProcessIf > proc
Transfer.
Definition: scheduler.hpp:540
ResourceToken token
Token held to implement resource limits.
Definition: scheduler.hpp:544