10#ifndef DAQ_DPM_SCHEDULER_HPP
11#define DAQ_DPM_SCHEDULER_HPP
17#include <boost/signals2.hpp>
18#include <fmt/ostream.h>
19#include <log4cplus/logger.h>
68 virtual auto GetId() const noexcept -> std::
string const& = 0;
83 virtual auto
GetResult() const noexcept -> std::filesystem::path const& = 0;
116 unsigned short net_send = 0;
121 unsigned short net_receive = 0;
126 unsigned short merge = 1;
147 unsigned short net_send = 0;
152 unsigned short net_receive = 0;
157 unsigned short merge = 0;
158 } concurrency_limits;
165 std::string merge_bin =
"daqDpmMerge";
166 std::string rsync_bin =
"rsync";
180 boost::signals2::scoped_connection
daqs;
183 boost::signals2::scoped_connection
merge;
247 virtual std::string
QueueDaq(std::string
const& dp_spec, std::string
const& status) = 0;
267 virtual bool IsQueued(std::string
const&
id)
const noexcept = 0;
283 virtual std::vector<std::string>
GetQueue() const noexcept = 0;
290 virtual boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) = 0;
298 std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>,
Resources&)>;
311 std::string QueueDaq(std::string
const& dp_spec, std::string
const& status)
override;
312 void AbortDaq(std::string
const&)
override;
313 bool IsQueued(std::string
const&
id)
const noexcept override;
314 Status GetDaqStatus(std::string
const&
id)
const override;
315 std::vector<std::string> GetQueue() const noexcept override;
323 boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) override;
325 void Start() override;
326 void Stop() override;
359 void ActivateFromQueue();
367 void ArchiveCompleted();
379 std::vector<std::
string> GetCandidates() const;
382 Active(std::unique_ptr<DaqController> daq_arg,
ResourceToken token_arg)
383 :
daq(std::move(daq_arg)), token(std::move(token_arg)) {
386 std::unique_ptr<DaqController>
daq;
393 SchedulerOptions m_options;
396 Resources m_resources;
397 ResourcesConnections m_resources_connections;
406 std::vector<Active> m_active;
413 std::vector<std::string> m_queue;
416 log4cplus::Logger m_logger;
421 std::shared_ptr<bool> m_liveness;
426 bool m_stopped =
true;
435 std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
441 using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
442 boost::asio::io_context&, std::vector<std::string>
const&)>;
450 std::unique_ptr<DaqWorkspace> workspace,
456 void Start()
override;
457 void Stop()
override;
458 auto IsStopped() const noexcept ->
bool override;
463 auto GetId() const noexcept -> std::
string const& override;
468 auto GetErrorFlag() const noexcept ->
bool override;
473 auto GetState() const noexcept ->
State override;
475 auto GetResult() const noexcept -> std::filesystem::path const&
override {
484 void Poll() override;
494 std::shared_ptr<RsyncAsyncProcessIf>
proc;
499 Collecting& operator=(Collecting&&) =
default;
503 ~Collecting() noexcept;
504 bool HasTransfer(
SourceFile const&) const noexcept;
514 Merging& operator=(Merging&&) =
default;
518 std::optional<ResourceToken> token = std::nullopt;
527 Releasing& operator=(Releasing&&) =
default;
531 ~Releasing() noexcept;
539 std::shared_ptr<RsyncAsyncProcessIf>
proc;
552 std::map<std::size_t, Transfer> transfers;
557 void Poll(Scheduled&);
558 void Poll(Collecting&);
559 void TransferComplete(SourceFile
const& source,
560 std::filesystem::path
const& local_path,
561 boost::future<int> result,
565 void MergeComplete(boost::future<int> result)
noexcept;
567 void Poll(Releasing&);
573 bool TryStartRelease(Releasing& ctx,
json::ReceiverTypes const& receiver, std::size_t index);
574 bool TryStartRelease(Releasing& ctx,
json::OlasReceiver const& receiver, std::size_t index);
584 void ReleaseComplete(std::size_t index,
586 boost::future<int> result,
589 void Poll(Completed&);
594 void HandleMergeMessage(std::string
const& line)
noexcept;
596 using StateVariant = std::variant<Scheduled, Collecting, Merging, Releasing, Completed>;
602 void SetState(StateVariant s);
607 std::string GetArchiveFilename()
const;
613 std::unique_ptr<DaqWorkspace> m_workspace;
615 RsyncFactory m_rsync_factory;
616 ProcFactory m_proc_factory;
624 std::filesystem::path m_result;
626 StateVariant m_state_ctx;
632 boost::signals2::scoped_connection m_status_connection;
639 std::shared_ptr<bool> m_liveness;
645 bool m_soft_stop =
false;
646 log4cplus::Logger m_logger;
651template <
typename T,
typename Char>
653 formatter<T, Char, std::enable_if_t<std::is_convertible_v<T*, daq::dpm::DaqController*>>>
654 : ostream_formatter {};
daq::AsyncProcess class definition
Interface to asynchronous process.
Abstract factory for DaqControllers.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
Stores data acquisition status and allows subscription to status changes.
Interface to interact with DPM workspace.
Internal data structure to SchedulerImpl.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Schedules asynchronous activities that results in merged Data Product and delivery.
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual std::string QueueDaq(std::string const &dp_spec, std::string const &status)=0
Queues DAQ for processing.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
Provides location of fits source file.
Interface to interact with DPM workspace.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::signals2::scoped_connection net_receive
boost::signals2::scoped_connection daqs
boost::signals2::scoped_connection merge
boost::signals2::scoped_connection net_send
Imposes limits on how many concurrent operations are allowed.
Options for DaqController.
Options controlling scheduler operations.
Imposes limits on how many concurrent operations are allowed.
std::variant< OlasReceiver > ReceiverTypes
Close representation of the JSON structure but with stronger types.
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
std::shared_ptr< RsyncAsyncProcessIf > proc
std::filesystem::path local_path
Rsync transfer of Data Product to a receiver.
std::shared_ptr< RsyncAsyncProcessIf > proc
Transfer.
ResourceToken token
Token held to implement resource limits.