10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
67 virtual auto GetId() const noexcept -> std::
string const& = 0;
82 virtual auto
GetResult() const noexcept -> std::filesystem::path const& = 0;
115 unsigned short net_send = 0;
120 unsigned short net_receive = 0;
125 unsigned short merge = 1;
146 unsigned short net_send = 0;
151 unsigned short net_receive = 0;
156 unsigned short merge = 0;
157 } concurrency_limits;
164 std::string merge_bin =
"daqDpmMerge";
165 std::string rsync_bin =
"rsync";
179 boost::signals2::scoped_connection
daqs;
182 boost::signals2::scoped_connection
merge;
245 virtual std::string
QueueDaq(std::string
const& dp_spec) = 0;
265 virtual bool IsQueued(std::string
const&
id)
const noexcept = 0;
281 virtual std::vector<std::string>
GetQueue() const noexcept = 0;
288 virtual boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) = 0;
296 std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>,
Resources&)>;
309 std::string QueueDaq(std::string
const& dp_spec)
override;
310 void AbortDaq(std::string
const&)
override;
311 bool IsQueued(std::string
const&
id)
const noexcept
override;
312 Status GetDaqStatus(std::string
const&
id)
const override;
313 std::vector<std::string> GetQueue() const noexcept override;
321 boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) override;
323 void Start() override;
324 void Stop() override;
357 void ActivateFromQueue();
365 void ArchiveCompleted();
378 std::vector<std::
string> GetCandidates() const;
381 Active(std::unique_ptr<DaqController> daq_arg,
ResourceToken token_arg)
382 :
daq(std::move(daq_arg)), token(std::move(token_arg)) {
385 std::unique_ptr<DaqController>
daq;
392 SchedulerOptions m_options;
395 Resources m_resources;
396 ResourcesConnections m_resources_connections;
405 std::vector<Active> m_active;
412 std::vector<std::string> m_queue;
417 log4cplus::Logger m_logger;
422 std::shared_ptr<bool> m_liveness;
427 bool m_stopped =
true;
436 std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
442 using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
443 boost::asio::io_context&, std::vector<std::string>
const&)>;
451 std::unique_ptr<DaqWorkspace> workspace,
457 void Start()
override;
458 void Stop()
override;
459 auto IsStopped() const noexcept ->
bool override;
464 auto
GetId() const noexcept -> std::
string const& override;
476 auto
GetResult() const noexcept -> std::filesystem::path const&
override {
485 void Poll() override;
495 std::shared_ptr<RsyncAsyncProcessIf>
proc;
500 Collecting& operator=(Collecting&&) =
default;
505 bool HasTransfer(
SourceFile const&) const noexcept;
515 Merging& operator=(Merging&&) =
default;
519 std::optional<ResourceToken> token = std::nullopt;
528 Releasing& operator=(Releasing&&) =
default;
540 std::shared_ptr<RsyncAsyncProcessIf>
proc;
553 std::map<std::size_t, Transfer> transfers;
558 void Poll(Scheduled&);
559 void Poll(Collecting&);
560 void TransferComplete(SourceFile
const& source,
561 std::filesystem::path
const& local_path,
562 boost::future<int> result,
566 void MergeComplete(boost::future<int> result) noexcept;
568 void Poll(Releasing&);
574 bool TryStartRelease(Releasing& ctx,
json::ReceiverTypes const& receiver, std::size_t index);
575 bool TryStartRelease(Releasing& ctx,
json::OlasReceiver const& receiver, std::size_t index);
585 void ReleaseComplete(std::size_t index,
587 boost::future<int> result,
590 void Poll(Completed&);
595 void HandleMergeMessage(std::string
const& line) noexcept;
597 using StateVariant = std::variant<Scheduled, Collecting, Merging, Releasing, Completed>;
603 void SetState(StateVariant s,
bool error =
false);
604 void SetError(
bool error);
609 std::string GetArchiveFilename()
const;
615 std::unique_ptr<DaqWorkspace> m_workspace;
617 RsyncFactory m_rsync_factory;
618 ProcFactory m_proc_factory;
626 std::filesystem::path m_result;
628 StateVariant m_state_ctx;
634 boost::signals2::scoped_connection m_status_connection;
641 std::shared_ptr<bool> m_liveness;
647 bool m_soft_stop =
false;
648 log4cplus::Logger m_logger;
daq::AsyncProcess class definition
Interface to asynchronous process.
Abstract factory for DaqControllers.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
Stores data acquisition status and allows subscription to status changes.
Interface to interact with DPM workspace.
Internal data structure to SchedulerImpl.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Schedules asynchronous activities that results in merged Data Product and delivery.
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Provides location of fits source file.
Interface to interact with DPM workspace.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::signals2::scoped_connection net_receive
boost::signals2::scoped_connection daqs
boost::signals2::scoped_connection merge
boost::signals2::scoped_connection net_send
Imposes limits on how many concurrent operations are allowed.
Options for DaqController.
Options controlling scheduler operations.
Imposes limits on how many concurrent operations are allowed.
std::variant< OlasReceiver > ReceiverTypes
Close representation of the JSON structure but with stronger types.
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
std::shared_ptr< RsyncAsyncProcessIf > proc
std::filesystem::path local_path
Rsync transfer of Data Product to a receiver.
std::shared_ptr< RsyncAsyncProcessIf > proc
Transfer.
ResourceToken token
Token held to implement resource limits.