ifw-daq  3.0.1
IFW Data Acquisition modules
makeDpSpec.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright
5  * (c) Copyright ESO 2022
6  * All Rights Reserved
7  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
8  */
9 #include <daq/makeDpSpec.hpp>
10 
11 #include <fmt/format.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/fits/json.hpp>
15 #include <daq/json/dpSpec.hpp>
16 
17 namespace daq {
18 
19 std::string MakeOcmName(DaqContext const& ctx) {
20  return fmt::format("@{}", ctx.process_name);
21 }
22 
23 /**
24  * Make OCM keywords source.
25  */
26 json::FitsKeywordsSource MakeOcmKeywords(DaqContext const& ctx, log4cplus::Logger& logger) {
28  kws.keywords = ctx.keywords;
29  kws.source_name = MakeOcmName(ctx);
30  return kws;
31 }
32 
33 namespace v1 {
34 
35 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
36  json::DpSpec dp_spec;
37  dp_spec.id = ctx.id;
38  dp_spec.target.file_id = ctx.file_id;
39  dp_spec.target.file_prefix = ctx.dp_name_prefix;
40 
41  // Add OCM keywords
42  if (!ctx.keywords.empty()) {
43  dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
44  }
45 
46  // V1 heuristics for when DaqContext has no specification provided by user.
47  // If
48  // - number of primary sources == 1
49  // - number of files from that primary == 1
50  // then we automatically designate it the *in-place* target.
51  std::optional<std::string> target_source_name;
52 
53  if (1 == ctx.prim_sources.size() &&
54  1 == std::count_if(ctx.results.begin(),
55  ctx.results.end(),
56  [source = ctx.prim_sources[0].name](DpPart const& part) {
57  return part.SourceName() == source;
58  })) {
59  auto it = std::find_if(ctx.results.begin(),
60  ctx.results.end(),
61  [source = ctx.prim_sources[0].name](DpPart const& part) {
62  return part.SourceName() == source;
63  });
64  assert(it != ctx.results.end());
65  if (std::holds_alternative<std::string>(it->Part())) {
66  // At this point we have:
67  // 1 primary source
68  // 1 output fits file
69  auto const& path = std::get<std::string>(it->Part());
70  target_source_name = ctx.prim_sources[0].name;
71  LOG4CPLUS_DEBUG(logger,
72  fmt::format("{}: Heuristics resulted in using the file "
73  "{} from {} as *in-place* merge target.",
74  ctx.id,
75  path,
76  *target_source_name));
77  auto& source = dp_spec.target.source.emplace();
78  source.source_name = it->SourceName();
79  source.location = path;
80  }
81  }
82 
83  if (ctx.results.empty()) {
84  throw boost::enable_current_exception(
85  std::invalid_argument("Cannot create data product specification with no results"));
86  }
87 
88  for (DpPart const& r : ctx.results) {
89  if (target_source_name && *target_source_name == r.SourceName()) {
90  // Skip target file
91  continue;
92  }
93  if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
94  // keywords
96  s.source_name = r.SourceName();
97  s.keywords = std::get<fits::KeywordVector>(r.Part());
98  dp_spec.sources.push_back(s);
99  } else if (std::holds_alternative<std::string>(r.Part())) {
101  s.source_name = r.SourceName();
102  s.location = std::get<std::string>(r.Part());
103  dp_spec.sources.push_back(s);
104  }
105  }
106 
107  return dp_spec;
108 }
109 
110 } // namespace v1
111 
112 namespace v2 {
113 
114 /**
115  * Per data source common specification that is only used for more efficient lookup.
116  */
118  /**
119  * Position index in original specification, used to order sources.
120  */
121  std::size_t index = std::numeric_limits<std::size_t>::max();
122  std::optional<json::InitialKeywords> initial_keywords = std::nullopt;
124 };
125 
126 std::unordered_map<std::string, CommonSourceSpecifications>
128  // NOTE: index 0 is reserved for internal OCM keywords
129  std::size_t index = 1;
130  std::unordered_map<std::string, CommonSourceSpecifications> lookup_map;
131 
132  for (auto const& source : spec.sources) {
134  common.index = index++;
135  std::string source_name;
136  std::visit(
137  [&](auto const& v) {
138  source_name = v.source_name;
139  common.initial_keywords = v.initial_keywords;
140  common.keyword_rules = v.keyword_rules;
141  },
142  source);
143 
144  lookup_map.emplace(std::move(source_name), std::move(common));
145  }
146  return lookup_map;
147 }
148 
149 /**
150  * Creates and returns the `/sources` and `/target` structures using DaqContext::specification.
151  *
152  * Assumptions:
153  * - source names are unique (DAQ result `DpPart` is uniqely identifying where it comes from.
154  */
155 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
156  json::DpSpec dp_spec;
157  dp_spec.id = ctx.id;
158  dp_spec.target.file_id = ctx.file_id;
159  dp_spec.target.file_prefix = ctx.dp_name_prefix;
160 
161  // Receivers are simply copied from StartDaqV2Spec
162  if (ctx.specification) {
163  dp_spec.receivers = ctx.specification->receivers;
164  LOG4CPLUS_DEBUG(logger,
165  fmt::format("{}: Number of receivers added from StartDaqV2Spec: {}",
166  ctx.id,
167  dp_spec.receivers.size()));
168  } else {
169  LOG4CPLUS_DEBUG(logger,
170  fmt::format("{}: No receivers added from StartDaqV2Spec: {}",
171  ctx.id,
172  dp_spec.receivers.size()));
173  }
174 
175  // Add OCM keywords
176  if (!ctx.keywords.empty()) {
177  dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
178  }
179 
180  auto lookup = [lookup_map = MakeCommonSpecifications(*ctx.specification)](
181  std::string const& source_name) -> CommonSourceSpecifications const* {
182  auto it = lookup_map.find(source_name);
183  if (it != std::end(lookup_map)) {
184  return &it->second;
185  }
186  return nullptr;
187  };
188 
189  // Objectives:
190  // - Use order from specification.sources
191  // - Use for each result use the specified keywordRules from specification.sources.
192  // - Use specified target (use lookup by sourceName)
193 
194  for (DpPart const& r : ctx.results) {
195  LOG4CPLUS_DEBUG(logger, "Adding DpPart " << r);
196  auto* common = lookup(r.SourceName());
197 
198  if (ctx.specification->merge_target.has_value() &&
199  ctx.specification->merge_target->source_name == r.SourceName() &&
200  std::holds_alternative<std::string>(r.Part())) {
201  LOG4CPLUS_DEBUG(logger, "Considering merge target " << r);
202  // Source is a file (as identified by std::string) that matches requested merge_target
203  // name -> Add as merge target!
204 
205  if (!dp_spec.target.source.has_value()) {
207  s.source_name = r.SourceName();
208  s.location = std::get<std::string>(r.Part());
209  dp_spec.target.source = s;
210  // NOTE keyword rules are not used for target yet.
211  LOG4CPLUS_DEBUG(logger,
212  fmt::format("Added merge target source from {} with file {}",
213  s.source_name,
214  s.location));
215  continue;
216  } else {
217  LOG4CPLUS_WARN(logger,
218  fmt::format("Multiple source files matched as merge-target! First "
219  "one has been chosen: {}",
220  dp_spec.target.source->location));
221  }
222  }
223 
224  // Non merge-target sources which are either JSON keywords or FITS files
225  if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
227  s.source_name = r.SourceName();
228  s.keywords = std::get<fits::KeywordVector>(r.Part());
229  if (common != nullptr) {
230  LOG4CPLUS_INFO(logger,
231  "Has common, and " << (!common->keyword_rules.empty()
232  ? "has keyword rules"
233  : "does NOT have keyword rules"));
234  s.initial_keywords = common->initial_keywords;
235  s.keyword_rules = common->keyword_rules;
236  }
237  dp_spec.sources.push_back(s);
238  } else if (std::holds_alternative<std::string>(r.Part())) {
240  s.source_name = r.SourceName();
241  s.location = std::get<std::string>(r.Part());
242  if (common != nullptr) {
243  LOG4CPLUS_INFO(logger,
244  "Has common, and " << (!common->keyword_rules.empty()
245  ? "has keyword rules"
246  : "does NOT have keyword rules"));
247  s.initial_keywords = common->initial_keywords;
248  s.keyword_rules = common->keyword_rules;
249  }
250  dp_spec.sources.push_back(s);
251  }
252  }
253 
254  // Lookup index of provided source.
255  auto index_of =
256  [&, ocm_name = MakeOcmName(ctx)](json::DpSpec::SourceTypes const& s) -> std::size_t {
257  auto const& name = std::visit([](auto const& t) { return t.source_name; }, s);
258  if (name == ocm_name) {
259  // Special case for OCM which needs to come first.
260  return 0;
261  }
262  auto* res = lookup(name);
263  if (res != nullptr) {
264  return res->index;
265  }
266  return std::numeric_limits<std::size_t>::max();
267  };
268 
269  // Stable sort sources according to the index order.
270  std::stable_sort(std::begin(dp_spec.sources),
271  std::end(dp_spec.sources),
272  [&](json::DpSpec::SourceTypes const& a, json::DpSpec::SourceTypes const& b) {
273  return index_of(a) < index_of(b);
274  });
275 
276  return dp_spec;
277 }
278 
279 } // namespace v2
280 
281 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
282  LOG4CPLUS_DEBUG(logger,
283  "MakeDataProductSpecification: DaqContext has a specification: "
284  << (ctx.specification.has_value() ? "yes" : "no"));
285  if (ctx.specification.has_value()) {
286  return v2::MakeDataProductSpecification(ctx, logger);
287  } else {
288  return v1::MakeDataProductSpecification(ctx, logger);
289  }
290 }
291 
292 } // namespace daq
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
auto Part() const noexcept -> PartTypes const &
Holds a std::string path [[user]@host:]path or FITS keywords.
Definition: dpPart.hpp:54
auto SourceName() const noexcept -> std::string const &
Source name of the part.
Definition: dpPart.hpp:44
Contains data structure for FITS keywords.
std::string id
DAQ id.
Definition: dpSpec.hpp:45
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
std::vector< KeywordRuleTypes > KeywordRules
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
std::variant< FitsKeywordsSource, FitsFileSource > SourceTypes
Definition: dpSpec.hpp:40
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
std::vector< DataSourceTypes > sources
Definition: startDaqV2.hpp:51
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Definition: makeDpSpec.cpp:35
std::optional< json::InitialKeywords > initial_keywords
Definition: makeDpSpec.cpp:122
std::unordered_map< std::string, CommonSourceSpecifications > MakeCommonSpecifications(json::StartDaqV2Spec const &spec)
Definition: makeDpSpec.cpp:127
std::size_t index
Position index in original specification, used to order sources.
Definition: makeDpSpec.cpp:121
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates and returns the /sources and /target structures using DaqContext::specification.
Definition: makeDpSpec.cpp:155
Per data source common specification that is only used for more efficient lookup.
Definition: makeDpSpec.cpp:117
json::FitsKeywordsSource MakeOcmKeywords(DaqContext const &ctx, log4cplus::Logger &logger)
Make OCM keywords source.
Definition: makeDpSpec.cpp:26
std::string MakeOcmName(DaqContext const &ctx)
Definition: makeDpSpec.cpp:19
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:281
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:102
std::string process_name
User defined process name.
Definition: daqContext.hpp:70
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:87
std::vector< Source > prim_sources
Definition: daqContext.hpp:76
std::optional< json::StartDaqV2Spec > specification
Optional specification, if DAQ was started using StartDaqV2.
Definition: daqContext.hpp:116
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:75
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60