ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
makeDpSpec.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_libdaq
4 * @copyright
5 * (c) Copyright ESO 2022
6 * All Rights Reserved
7 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
8 */
9#include <daq/makeDpSpec.hpp>
10
11#include <boost/exception/enable_current_exception.hpp>
12#include <fmt/format.h>
13#include <log4cplus/loggingmacros.h>
14
15#include <daq/fits/json.hpp>
16#include <daq/json/dpSpec.hpp>
17
18namespace daq {
19
20std::string MakeOcmName(DaqContext const& ctx) {
21 return fmt::format("@{}", ctx.process_name);
22}
23
24/**
25 * Make OCM keywords source.
26 */
27json::FitsKeywordsSource MakeOcmKeywords(DaqContext const& ctx, log4cplus::Logger& logger) {
29 kws.keywords = ctx.keywords;
30 kws.source_name = MakeOcmName(ctx);
31 return kws;
32}
33
34namespace v1 {
35
36json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
37 json::DpSpec dp_spec;
38 dp_spec.id = ctx.id;
39 dp_spec.target.file_id = ctx.file_id;
40 dp_spec.target.file_prefix = ctx.dp_name_prefix;
41
42 // Add OCM keywords
43 if (!ctx.keywords.empty()) {
44 dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
45 }
46
47 // V1 heuristics for when DaqContext has no specification provided by user.
48 // If
49 // - number of primary sources == 1
50 // - number of files from that primary == 1
51 // then we automatically designate it the *in-place* target.
52 std::optional<std::string> target_source_name;
53
54 if (1 == ctx.prim_sources.size() &&
55 1 == std::count_if(ctx.results.begin(),
56 ctx.results.end(),
57 [source = ctx.prim_sources[0].name](DpPart const& part) {
58 return part.SourceName() == source;
59 })) {
60 auto it = std::find_if(ctx.results.begin(),
61 ctx.results.end(),
62 [source = ctx.prim_sources[0].name](DpPart const& part) {
63 return part.SourceName() == source;
64 });
65 assert(it != ctx.results.end());
66 if (std::holds_alternative<std::string>(it->Part())) {
67 // At this point we have:
68 // 1 primary source
69 // 1 output fits file
70 auto const& path = std::get<std::string>(it->Part());
71 target_source_name = ctx.prim_sources[0].name;
72 LOG4CPLUS_DEBUG(logger,
73 fmt::format("{}: Heuristics resulted in using the file "
74 "{} from {} as *in-place* merge target.",
75 ctx.id,
76 path,
77 *target_source_name));
78 auto& source = dp_spec.target.source.emplace();
79 source.source_name = it->SourceName();
80 source.location = path;
81 }
82 }
83
84 if (ctx.results.empty()) {
85 throw boost::enable_current_exception(
86 std::invalid_argument("Cannot create data product specification with no results"));
87 }
88
89 for (DpPart const& r : ctx.results) {
90 if (target_source_name && *target_source_name == r.SourceName()) {
91 // Skip target file
92 continue;
93 }
94 if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
95 // keywords
97 s.source_name = r.SourceName();
98 s.keywords = std::get<fits::KeywordVector>(r.Part());
99 dp_spec.sources.push_back(s);
100 } else if (std::holds_alternative<std::string>(r.Part())) {
102 s.source_name = r.SourceName();
103 s.location = std::get<std::string>(r.Part());
104 dp_spec.sources.push_back(s);
105 }
106 }
107
108 return dp_spec;
109}
110
111} // namespace v1
112
113namespace v2 {
114
115/**
116 * Per data source common specification that is only used for more efficient lookup.
117 */
119 /**
120 * Position index in original specification, used to order sources.
121 */
122 std::size_t index = std::numeric_limits<std::size_t>::max();
123 std::optional<json::InitialKeywords> initial_keywords = std::nullopt;
125};
126
127std::unordered_map<std::string, CommonSourceSpecifications>
129 // NOTE: index 0 is reserved for internal OCM keywords
130 std::size_t index = 1;
131 std::unordered_map<std::string, CommonSourceSpecifications> lookup_map;
132
133 for (auto const& source : spec.sources) {
135 common.index = index++;
136 std::string source_name;
137 std::visit(
138 [&](auto const& v) {
139 source_name = v.source_name;
140 common.initial_keywords = v.initial_keywords;
141 common.keyword_rules = v.keyword_rules;
142 },
143 source);
144
145 lookup_map.emplace(std::move(source_name), std::move(common));
146 }
147 return lookup_map;
148}
149
150/**
151 * Creates and returns the `/sources` and `/target` structures using DaqContext::specification.
152 *
153 * Assumptions:
154 * - source names are unique (DAQ result `DpPart` is uniqely identifying where it comes from.
155 */
156json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
157 json::DpSpec dp_spec;
158 dp_spec.id = ctx.id;
159 dp_spec.target.file_id = ctx.file_id;
160 dp_spec.target.file_prefix = ctx.dp_name_prefix;
161
162 // Receivers are simply copied from StartDaqV2Spec
163 if (ctx.specification) {
164 dp_spec.receivers = ctx.specification->receivers;
165 LOG4CPLUS_DEBUG(logger,
166 fmt::format("{}: Number of receivers added from StartDaqV2Spec: {}",
167 ctx.id,
168 dp_spec.receivers.size()));
169 } else {
170 LOG4CPLUS_DEBUG(logger,
171 fmt::format("{}: No receivers added from StartDaqV2Spec: {}",
172 ctx.id,
173 dp_spec.receivers.size()));
174 }
175
176 // Add OCM keywords
177 if (!ctx.keywords.empty()) {
178 dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
179 }
180
181 auto lookup = [lookup_map = MakeCommonSpecifications(*ctx.specification)](
182 std::string const& source_name) -> CommonSourceSpecifications const* {
183 auto it = lookup_map.find(source_name);
184 if (it != std::end(lookup_map)) {
185 return &it->second;
186 }
187 return nullptr;
188 };
189
190 // Objectives:
191 // - Use order from specification.sources
192 // - Use for each result use the specified keywordRules from specification.sources.
193 // - Use specified target (use lookup by sourceName)
194
195 for (DpPart const& r : ctx.results) {
196 LOG4CPLUS_DEBUG(logger, "Adding DpPart " << r);
197 auto* common = lookup(r.SourceName());
198
199 if (ctx.specification->merge_target.has_value() &&
200 ctx.specification->merge_target->source_name == r.SourceName() &&
201 std::holds_alternative<std::string>(r.Part())) {
202 LOG4CPLUS_DEBUG(logger, "Considering merge target " << r);
203 // Source is a file (as identified by std::string) that matches requested merge_target
204 // name -> Add as merge target!
205
206 if (!dp_spec.target.source.has_value()) {
208 s.source_name = r.SourceName();
209 s.location = std::get<std::string>(r.Part());
210 dp_spec.target.source = s;
211 // NOTE keyword rules are not used for target yet.
212 LOG4CPLUS_DEBUG(logger,
213 fmt::format("Added merge target source from {} with file {}",
214 s.source_name,
215 s.location));
216 continue;
217 } else {
218 LOG4CPLUS_WARN(logger,
219 fmt::format("Multiple source files matched as merge-target! First "
220 "one has been chosen: {}",
221 dp_spec.target.source->location));
222 }
223 }
224
225 // Non merge-target sources which are either JSON keywords or FITS files
226 if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
228 s.source_name = r.SourceName();
229 s.keywords = std::get<fits::KeywordVector>(r.Part());
230 if (common != nullptr) {
231 LOG4CPLUS_INFO(logger,
232 "Has common, and " << (!common->keyword_rules.empty()
233 ? "has keyword rules"
234 : "does NOT have keyword rules"));
235 s.initial_keywords = common->initial_keywords;
236 s.keyword_rules = common->keyword_rules;
237 }
238 dp_spec.sources.push_back(s);
239 } else if (std::holds_alternative<std::string>(r.Part())) {
241 s.source_name = r.SourceName();
242 s.location = std::get<std::string>(r.Part());
243 if (common != nullptr) {
244 LOG4CPLUS_INFO(logger,
245 "Has common, and " << (!common->keyword_rules.empty()
246 ? "has keyword rules"
247 : "does NOT have keyword rules"));
248 s.initial_keywords = common->initial_keywords;
249 s.keyword_rules = common->keyword_rules;
250 }
251 dp_spec.sources.push_back(s);
252 }
253 }
254
255 // Lookup index of provided source.
256 auto index_of =
257 [&, ocm_name = MakeOcmName(ctx)](json::DpSpec::SourceTypes const& s) -> std::size_t {
258 auto const& name = std::visit([](auto const& t) { return t.source_name; }, s);
259 if (name == ocm_name) {
260 // Special case for OCM which needs to come first.
261 return 0;
262 }
263 auto* res = lookup(name);
264 if (res != nullptr) {
265 return res->index;
266 }
267 return std::numeric_limits<std::size_t>::max();
268 };
269
270 // Stable sort sources according to the index order.
271 std::stable_sort(std::begin(dp_spec.sources),
272 std::end(dp_spec.sources),
274 return index_of(a) < index_of(b);
275 });
276
277 return dp_spec;
278}
279
280} // namespace v2
281
282json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
283 LOG4CPLUS_DEBUG(logger,
284 "MakeDataProductSpecification: DaqContext has a specification: "
285 << (ctx.specification.has_value() ? "yes" : "no"));
286 if (ctx.specification.has_value()) {
287 return v2::MakeDataProductSpecification(ctx, logger);
288 } else {
289 return v1::MakeDataProductSpecification(ctx, logger);
290 }
291}
292
293} // namespace daq
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
auto Part() const noexcept -> PartTypes const &
Holds a std::string path [[user]@host:]path or FITS keywords.
Definition: dpPart.hpp:54
auto SourceName() const noexcept -> std::string const &
Source name of the part.
Definition: dpPart.hpp:44
Contains data structure for FITS keywords.
std::string id
DAQ id.
Definition: dpSpec.hpp:45
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
std::vector< KeywordRuleTypes > KeywordRules
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
std::variant< FitsKeywordsSource, FitsFileSource > SourceTypes
Definition: dpSpec.hpp:40
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
std::vector< DataSourceTypes > sources
Definition: startDaqV2.hpp:51
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Definition: makeDpSpec.cpp:36
std::optional< json::InitialKeywords > initial_keywords
Definition: makeDpSpec.cpp:123
std::unordered_map< std::string, CommonSourceSpecifications > MakeCommonSpecifications(json::StartDaqV2Spec const &spec)
Definition: makeDpSpec.cpp:128
std::size_t index
Position index in original specification, used to order sources.
Definition: makeDpSpec.cpp:122
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates and returns the /sources and /target structures using DaqContext::specification.
Definition: makeDpSpec.cpp:156
Per data source common specification that is only used for more efficient lookup.
Definition: makeDpSpec.cpp:118
json::FitsKeywordsSource MakeOcmKeywords(DaqContext const &ctx, log4cplus::Logger &logger)
Make OCM keywords source.
Definition: makeDpSpec.cpp:27
std::string MakeOcmName(DaqContext const &ctx)
Definition: makeDpSpec.cpp:20
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:282
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:100
std::string process_name
User defined process name.
Definition: daqContext.hpp:68
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:85
std::vector< Source > prim_sources
Definition: daqContext.hpp:74
std::optional< json::StartDaqV2Spec > specification
Optional specification, if DAQ was started using StartDaqV2.
Definition: daqContext.hpp:114
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:63
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:73
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:58