DPsim
InterfaceWorkerVillas.cpp
1 // SPDX-License-Identifier: Apache-2.0
2 
3 #include <cstdio>
4 #include <cstdlib>
5 #include <stdexcept>
6 
7 #include <chrono>
8 #include <poll.h>
9 #include <thread>
10 #include <unistd.h>
11 
12 #include <spdlog/sinks/stdout_color_sinks.h>
13 
14 #include <dpsim-models/Logger.h>
15 #include <dpsim-villas/InterfaceWorkerVillas.h>
16 #include <villas/path.hpp>
17 #include <villas/signal_list.hpp>
18 
19 using namespace CPS;
20 using namespace DPsim;
21 
22 Bool InterfaceWorkerVillas::villasInitialized = false;
23 UInt InterfaceWorkerVillas::villasAffinity = 0;
24 UInt InterfaceWorkerVillas::villasPriority = 0;
25 UInt InterfaceWorkerVillas::villasHugePages = 100;
26 
27 InterfaceWorkerVillas::InterfaceWorkerVillas(const String &nodeConfig,
28  UInt queueLength,
29  UInt sampleLength,
30  spdlog::level::level_enum logLevel)
31  : InterfaceWorker(), mNodeConfig(nodeConfig), mQueueLength(queueLength),
32  mSampleLength(sampleLength) {
33  mLog = CPS::Logger::get("InterfaceWorkerVillas", logLevel, logLevel);
34 }
35 
36 void InterfaceWorkerVillas::open() {
37  SPDLOG_LOGGER_INFO(mLog, "Opening InterfaceWorkerVillas...");
38  Log::getInstance().setLevel(mLog->level());
39 
40  if (!InterfaceWorkerVillas::villasInitialized) {
41  SPDLOG_LOGGER_INFO(mLog, "Initializing Villas...");
42  initVillas();
43  InterfaceWorkerVillas::villasInitialized = true;
44  }
45 
46  json_error_t error;
47  json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
48  if (config == nullptr) {
49  SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to parse node config! Error: {}",
50  error.text);
51  throw JsonError(config, error);
52  }
53 
54  const json_t *nodeType = json_object_get(config, "type");
55  if (nodeType == nullptr) {
56  SPDLOG_LOGGER_ERROR(mLog, "Error: Node config does not contain type-key!");
57  std::exit(1);
58  }
59  String nodeTypeString = json_string_value(nodeType);
60 
61  mNode = node::NodeFactory::make(nodeTypeString);
62 
63  int ret = 0;
64  ret = mNode->parse(config);
65  if (ret < 0) {
66  SPDLOG_LOGGER_ERROR(mLog,
67  "Error: Node in InterfaceVillas failed to parse "
68  "config. Parse returned code {}",
69  ret);
70  std::exit(1);
71  }
72  ret = mNode->check();
73  if (ret < 0) {
74  SPDLOG_LOGGER_ERROR(
75  mLog,
76  "Error: Node in InterfaceVillas failed check. Check returned code {}",
77  ret);
78  std::exit(1);
79  }
80 
81  SPDLOG_LOGGER_INFO(mLog, "Preparing VILLASNode instance...");
82  setupNodeSignals();
83  prepareNode();
84  SPDLOG_LOGGER_INFO(mLog, "Node is ready to send / receive data!");
85  mOpened = true;
86 
87  mSequence = 0;
88  mLastSample = node::sample_alloc(&mSamplePool);
89  mLastSample->signals = mNode->getInputSignals(false);
90  mLastSample->sequence = 0;
91  mLastSample->ts.origin.tv_sec = 0;
92  mLastSample->ts.origin.tv_nsec = 0;
93 
94  std::memset(&mLastSample->data, 0, mLastSample->capacity * sizeof(float));
95 }
96 
97 void InterfaceWorkerVillas::prepareNode() {
98  int ret = node::pool_init(&mSamplePool, mQueueLength,
99  sizeof(Sample) + SAMPLE_DATA_LENGTH(mSampleLength));
100  if (ret < 0) {
101  SPDLOG_LOGGER_ERROR(mLog,
102  "Error: InterfaceVillas failed to init sample pool. "
103  "pool_init returned code {}",
104  ret);
105  std::exit(1);
106  }
107 
108  ret = mNode->prepare();
109  if (ret < 0) {
110  SPDLOG_LOGGER_ERROR(mLog,
111  "Error: Node in InterfaceVillas failed to prepare. "
112  "Prepare returned code {}",
113  ret);
114  std::exit(1);
115  }
116  SPDLOG_LOGGER_INFO(mLog, "Node: {}", mNode->getNameFull());
117 
118  mNode->getFactory()->start(
119  nullptr); //We have no SuperNode, so just hope type_start doesnt use it...
120 
121  ret = mNode->start();
122  if (ret < 0) {
123  SPDLOG_LOGGER_ERROR(mLog,
124  "Fatal error: failed to start node in InterfaceVillas. "
125  "Start returned code {}",
126  ret);
127  close();
128  std::exit(1);
129  }
130 }
131 
132 void InterfaceWorkerVillas::setupNodeSignals() {
133  mNode->out.path = new node::Path();
134  mNode->out.path->signals = std::make_shared<node::SignalList>();
135  node::SignalList::Ptr nodeOutputSignals =
136  mNode->out.path->getOutputSignals(false);
137  nodeOutputSignals->clear();
138  int idx = 0;
139  for (const auto &[id, signal] : mExportSignals) {
140  while (id > idx) {
141  nodeOutputSignals->push_back(
142  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
143  idx++;
144  }
145  nodeOutputSignals->push_back(signal);
146  idx++;
147  }
148 
149  node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(true);
150  if (nodeInputSignals == nullptr) {
151  nodeInputSignals = std::make_shared<node::SignalList>();
152  } else {
153  nodeInputSignals->clear();
154  }
155  idx = 0;
156  for (const auto &[id, signal] : mImportSignals) {
157  while (id > idx) {
158  nodeInputSignals->push_back(
159  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
160  idx++;
161  }
162  nodeInputSignals->push_back(signal);
163  idx++;
164  }
165 }
166 
168  SPDLOG_LOGGER_INFO(mLog, "Closing InterfaceVillas...");
169  int ret = mNode->stop();
170  if (ret < 0) {
171  SPDLOG_LOGGER_ERROR(
172  mLog,
173  "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
174  ret);
175  std::exit(1);
176  }
177  mOpened = false;
178  ret = node::pool_destroy(&mSamplePool);
179  if (ret < 0) {
180  SPDLOG_LOGGER_ERROR(mLog,
181  "Error: failed to destroy SamplePool in "
182  "InterfaceVillas. pool_destroy returned code {}",
183  ret);
184  std::exit(1);
185  }
186 
187  mNode->getFactory()->stop();
188 
189  delete mNode;
190 }
191 
193  std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
194  Sample *sample = nullptr;
195  int ret = 0;
196  bool shouldRead = false;
197  try {
198  auto pollFds = mNode->getPollFDs();
199 
200  if (!pollFds.empty()) {
201  auto pfds = std::vector<struct pollfd>();
202 
203  for (auto pollFd : pollFds) {
204  pfds.push_back(pollfd{.fd = pollFd, .events = POLLIN});
205  }
206 
207  ret = ::poll(pfds.data(), pfds.size(), 0);
208 
209  if (ret < 0) {
210  SPDLOG_LOGGER_ERROR(mLog,
211  "Fatal error: failed to read sample from "
212  "InterfaceVillas. Poll returned code {}",
213  ret);
214  close();
215  std::exit(1);
216  }
217 
218  if (ret == 0) {
219  return;
220  }
221 
222  for (const auto &pfd : pfds) {
223  if (pfd.revents & POLLIN) {
224  shouldRead = true;
225  break;
226  }
227  }
228  } else {
229  //If the node does not support pollFds just do a blocking read
230  shouldRead = true;
231  }
232 
233  if (shouldRead) {
234  sample = node::sample_alloc(&mSamplePool);
235 
236  ret = mNode->read(&sample, 1);
237  if (ret < 0) {
238  SPDLOG_LOGGER_ERROR(mLog,
239  "Fatal error: failed to read sample from "
240  "InterfaceVillas. Read returned code {}",
241  ret);
242  close();
243  std::exit(1);
244  }
245 
246  if (ret != 0) {
247  for (UInt i = 0; i < mImports.size(); i++) {
248  auto importedAttr = std::get<0>(mImports[i])(sample);
249  if (!importedAttr.isNull()) {
250  updatedAttrs.emplace_back(InterfaceQueued::AttributePacket{
251  importedAttr, i, mCurrentSequenceInterfaceToDpsim,
252  InterfaceQueued::AttributePacketFlags::PACKET_NO_FLAGS});
253  mCurrentSequenceInterfaceToDpsim++;
254  }
255  }
256 
257  if (!pollFds.empty()) {
258  //Manually clear the event file descriptor since Villas does not do that for some reason
259  //See https://github.com/VILLASframework/node/issues/309
260  uint64_t result = 0;
261  ret = (int)::read(pollFds[0], &result, 8);
262  if (ret < 0) {
263  SPDLOG_LOGGER_WARN(
264  mLog, "Could not reset poll file descriptor! Read returned {}",
265  ret);
266  }
267  if (result > 1) {
268  result = result - 1;
269  ret = (int)::write(pollFds[0], (void *)&result, 8);
270  if (ret < 0) {
271  SPDLOG_LOGGER_WARN(
272  mLog,
273  "Could not decrement poll file descriptor! Write returned {}",
274  ret);
275  }
276  }
277  }
278  }
279 
280  sample_decref(sample);
281  }
282  } catch (const std::exception &) {
283  if (sample)
284  sample_decref(sample);
285 
286  throw;
287  }
288 }
289 
291  std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
292  // Update export sequence IDs
293  for (const auto &packet : updatedAttrs) {
294  if (std::get<1>(mExports[packet.attributeId]) < packet.sequenceId) {
295  std::get<1>(mExports[packet.attributeId]) = packet.sequenceId;
296  }
297  }
298 
299  // Remove outdated packets
300  auto beginOutdated = std::remove_if(
301  updatedAttrs.begin(), updatedAttrs.end(), [this](auto packet) {
302  return std::get<1>(mExports[packet.attributeId]) > packet.sequenceId;
303  });
304  updatedAttrs.erase(beginOutdated, updatedAttrs.end());
305 
306  Sample *sample = nullptr;
307  Int ret = 0;
308  bool done = false;
309  bool sampleFilled = false;
310  try {
311  sample = node::sample_alloc(&mSamplePool);
312  if (sample == nullptr) {
313  SPDLOG_LOGGER_ERROR(mLog, "InterfaceVillas could not allocate a new "
314  "sample! Not sending any data!");
315  return;
316  }
317 
318  sample->signals = mNode->getOutputSignals(false);
319  auto beginExported = std::remove_if(
320  updatedAttrs.begin(), updatedAttrs.end(),
321  [this, &sampleFilled, &sample](auto packet) {
322  if (!std::get<2>(mExports[packet.attributeId])) {
323  //Write attribute to sample ASAP
324  std::get<0>(mExports[packet.attributeId])(packet.value, sample);
325  sampleFilled = true;
326  return true;
327  }
328  return false;
329  });
330  updatedAttrs.erase(beginExported, updatedAttrs.end());
331 
332  //Check if the remaining packets form a complete set
333  if (((long)updatedAttrs.size()) ==
334  std::count_if(mExports.cbegin(), mExports.cend(),
335  [](auto x) { return std::get<2>(x); })) {
336  for (const auto &packet : updatedAttrs) {
337  std::get<0>(mExports[packet.attributeId])(packet.value, sample);
338  }
339  sampleFilled = true;
340  updatedAttrs.clear();
341  }
342 
343  if (sampleFilled) {
344  sample->sequence = mSequence++;
345  sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
346  sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
347  clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
348  sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
349  done = true;
350 
351  do {
352  ret = mNode->write(&sample, 1);
353  } while (ret == 0);
354  if (ret < 0)
355  SPDLOG_LOGGER_ERROR(mLog,
356  "Failed to write samples to InterfaceVillas. Write "
357  "returned code {}",
358  ret);
359 
360  sample_copy(mLastSample, sample);
361  }
362  sample_decref(sample);
363  } catch (const std::exception &) {
364  /* We need to at least send something, so determine where exactly the
365  * timer expired and either resend the last successfully sent sample or
366  * just try to send this one again.
367  * TODO: can this be handled better? */
368  if (!done)
369  sample = mLastSample;
370 
371  while (ret == 0)
372  ret = mNode->write(&sample, 1);
373 
374  sample_decref(sample);
375 
376  if (ret < 0)
377  SPDLOG_LOGGER_ERROR(
378  mLog,
379  "Failed to write samples to InterfaceVillas. Write returned code {}",
380  ret);
381 
382  /* Don't throw here, because we managed to send something */
383  }
384 }
385 
386 void InterfaceWorkerVillas::initVillas() const {
387  if (int ret = node::memory::init(villasHugePages); ret)
388  throw RuntimeError("Error: VillasNode failed to initialize memory system");
389 
390  villas::kernel::rt::init(villasPriority, villasAffinity);
391 }
392 
393 void InterfaceWorkerVillas::configureExport(UInt attributeId,
394  const std::type_info &type,
395  UInt idx, Bool waitForOnWrite,
396  const String &name,
397  const String &unit) {
398  if (mOpened) {
399  if (mLog != nullptr) {
400  SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
401  "Configuration will remain unchanged.");
402  }
403  return;
404  }
405  if (attributeId != mExports.size()) {
406  if (mLog != nullptr) {
407  SPDLOG_LOGGER_WARN(
408  mLog, "The exports already configured do not match with the given "
409  "attribute ID! Configuration will remain unchanged.");
410  }
411  return;
412  }
413 
414  if (type == typeid(Int)) {
415  mExports.emplace_back(
416  [idx](AttributeBase::Ptr attr, Sample *smp) {
417  if (idx >= smp->capacity)
418  throw std::out_of_range("not enough space in allocated sample");
419  if (idx >= smp->length)
420  smp->length = idx + 1;
421 
422  Attribute<Int>::Ptr attrTyped =
423  std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
424 
425  if (attrTyped.isNull())
427 
428  smp->data[idx].i = **attrTyped;
429  },
430  0, waitForOnWrite);
431  mExportSignals[idx] =
432  std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
433  } else if (type == typeid(Real)) {
434  mExports.emplace_back(
435  [idx](AttributeBase::Ptr attr, Sample *smp) {
436  if (idx >= smp->capacity)
437  throw std::out_of_range("not enough space in allocated sample");
438  if (idx >= smp->length)
439  smp->length = idx + 1;
440 
441  Attribute<Real>::Ptr attrTyped =
442  std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
443 
444  if (attrTyped.isNull())
446 
447  smp->data[idx].f = **attrTyped;
448  },
449  0, waitForOnWrite);
450  mExportSignals[idx] =
451  std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
452  } else if (type == typeid(Complex)) {
453  mExports.emplace_back(
454  [idx](AttributeBase::Ptr attr, Sample *smp) {
455  if (idx >= smp->capacity)
456  throw std::out_of_range("not enough space in allocated sample");
457  if (idx >= smp->length)
458  smp->length = idx + 1;
459 
460  Attribute<Complex>::Ptr attrTyped =
461  std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
462 
463  if (attrTyped.isNull())
465 
466  smp->data[idx].z = **attrTyped;
467  },
468  0, waitForOnWrite);
469  mExportSignals[idx] =
470  std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
471  } else if (type == typeid(Bool)) {
472  mExports.emplace_back(
473  [idx](AttributeBase::Ptr attr, Sample *smp) {
474  if (idx >= smp->capacity)
475  throw std::out_of_range("not enough space in allocated sample");
476  if (idx >= smp->length)
477  smp->length = idx + 1;
478 
479  Attribute<Bool>::Ptr attrTyped =
480  std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
481 
482  if (attrTyped.isNull())
484 
485  smp->data[idx].b = **attrTyped;
486  },
487  0, waitForOnWrite);
488  mExportSignals[idx] =
489  std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
490  } else {
491  if (mLog != nullptr) {
492  SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
493  "configuration will remain unchanged!");
494  }
495  }
496 }
497 
498 void InterfaceWorkerVillas::configureImport(UInt attributeId,
499  const std::type_info &type,
500  UInt idx, const String &name,
501  const String &unit) {
502  if (mOpened) {
503  if (mLog != nullptr) {
504  SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
505  "Configuration will remain unchanged.");
506  }
507  return;
508  }
509  if (attributeId != mImports.size()) {
510  if (mLog != nullptr) {
511  SPDLOG_LOGGER_WARN(
512  mLog, "The imports already configured do not match with the given "
513  "attribute ID! Configuration will remain unchanged.");
514  }
515  return;
516  }
517  const auto &log = mLog;
518 
519  if (type == typeid(Int)) {
520  mImports.emplace_back(
521  [idx, log](Sample *smp) -> AttributeBase::Ptr {
522  if (idx >= smp->length) {
523  log->error("incomplete data received from InterfaceVillas");
524  return nullptr;
525  }
527  AttributeStatic<Int>::make(smp->data[idx].i));
528  },
529  0);
530  mImportSignals[idx] =
531  std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
532  } else if (type == typeid(Real)) {
533  mImports.emplace_back(
534  [idx, log](Sample *smp) -> AttributeBase::Ptr {
535  if (idx >= smp->length) {
536  log->error("incomplete data received from InterfaceVillas");
537  return nullptr;
538  }
540  AttributeStatic<Real>::make(smp->data[idx].f));
541  },
542  0);
543  mImportSignals[idx] =
544  std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
545  } else if (type == typeid(Complex)) {
546  mImports.emplace_back(
547  [idx, log](Sample *smp) -> AttributeBase::Ptr {
548  if (idx >= smp->length) {
549  log->error("incomplete data received from InterfaceVillas");
550  return nullptr;
551  }
553  AttributeStatic<Complex>::make(smp->data[idx].z));
554  },
555  0);
556  mImportSignals[idx] =
557  std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
558  } else if (type == typeid(Bool)) {
559  mImports.emplace_back(
560  [idx, log](Sample *smp) -> AttributeBase::Ptr {
561  if (idx >= smp->length) {
562  log->error("incomplete data received from InterfaceVillas");
563  return nullptr;
564  }
566  AttributeStatic<Bool>::make(smp->data[idx].b));
567  },
568  0);
569  mImportSignals[idx] =
570  std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
571  } else {
572  if (mLog != nullptr) {
573  SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
574  "configuration will remain unchanged!");
575  }
576  }
577 }
578 
579 void InterfaceWorkerVillas::printSignals() const {
580  SPDLOG_LOGGER_INFO(
581  mLog,
582  "InterfaceWorkerVillas Settings: Queue Length: {}, Sample Length: {}",
583  mQueueLength, mSampleLength);
584  SPDLOG_LOGGER_INFO(mLog, "Export signals:");
585  for (const auto &[id, signal] : mExportSignals) {
586  SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
587  signal->name, signal->unit,
588  node::signalTypeToString(signal->type));
589  }
590 
591  SPDLOG_LOGGER_INFO(mLog, "Import signals:");
592  for (const auto &[id, signal] : mImportSignals) {
593  SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
594  signal->name, signal->unit,
595  node::signalTypeToString(signal->type));
596  }
597 }
void writeValuesToEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override
void readValuesFromEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override