DPsim
InterfaceWorkerVillas.cpp
1 // SPDX-License-Identifier: Apache-2.0
2 
3 #include <cstdio>
4 #include <cstdlib>
5 #include <stdexcept>
6 
7 #include <chrono>
8 #include <poll.h>
9 #include <thread>
10 #include <unistd.h>
11 
12 #include <spdlog/sinks/stdout_color_sinks.h>
13 
14 #include <dpsim-models/Logger.h>
15 #include <dpsim-villas/InterfaceWorkerVillas.h>
16 #include <villas/path.hpp>
17 #include <villas/signal_list.hpp>
18 
19 using namespace CPS;
20 using namespace DPsim;
21 
22 Bool InterfaceWorkerVillas::villasInitialized = false;
23 UInt InterfaceWorkerVillas::villasAffinity = 0;
24 UInt InterfaceWorkerVillas::villasPriority = 0;
25 UInt InterfaceWorkerVillas::villasHugePages = 100;
26 
27 InterfaceWorkerVillas::InterfaceWorkerVillas(const String &nodeConfig,
28  UInt queueLength,
29  UInt sampleLength)
30  : InterfaceWorker(), mNodeConfig(nodeConfig), mQueueLength(queueLength),
31  mSampleLength(sampleLength) {}
32 
33 void InterfaceWorkerVillas::open() {
34  SPDLOG_LOGGER_INFO(mLog, "Opening InterfaceWorkerVillas...");
35 
36  if (!InterfaceWorkerVillas::villasInitialized) {
37  SPDLOG_LOGGER_INFO(mLog, "Initializing Villas...");
38  initVillas();
39  InterfaceWorkerVillas::villasInitialized = true;
40  }
41 
42  json_error_t error;
43  json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
44  if (config == nullptr) {
45  SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to parse node config! Error: {}",
46  error.text);
47  throw JsonError(config, error);
48  }
49 
50  const json_t *nodeType = json_object_get(config, "type");
51  if (nodeType == nullptr) {
52  SPDLOG_LOGGER_ERROR(mLog, "Error: Node config does not contain type-key!");
53  std::exit(1);
54  }
55  String nodeTypeString = json_string_value(nodeType);
56 
57  mNode = node::NodeFactory::make(nodeTypeString);
58 
59  int ret = 0;
60  ret = mNode->parse(config);
61  if (ret < 0) {
62  SPDLOG_LOGGER_ERROR(mLog,
63  "Error: Node in InterfaceVillas failed to parse "
64  "config. Parse returned code {}",
65  ret);
66  std::exit(1);
67  }
68  ret = mNode->check();
69  if (ret < 0) {
70  SPDLOG_LOGGER_ERROR(
71  mLog,
72  "Error: Node in InterfaceVillas failed check. Check returned code {}",
73  ret);
74  std::exit(1);
75  }
76 
77  SPDLOG_LOGGER_INFO(mLog, "Preparing VILLASNode instance...");
78  setupNodeSignals();
79  prepareNode();
80  SPDLOG_LOGGER_INFO(mLog, "Node is ready to send / receive data!");
81  mOpened = true;
82 
83  mSequence = 0;
84  mLastSample = node::sample_alloc(&mSamplePool);
85  mLastSample->signals = mNode->getInputSignals(false);
86  mLastSample->sequence = 0;
87  mLastSample->ts.origin.tv_sec = 0;
88  mLastSample->ts.origin.tv_nsec = 0;
89 
90  std::memset(&mLastSample->data, 0, mLastSample->capacity * sizeof(float));
91 }
92 
93 void InterfaceWorkerVillas::prepareNode() {
94  int ret = node::pool_init(&mSamplePool, mQueueLength,
95  sizeof(Sample) + SAMPLE_DATA_LENGTH(mSampleLength));
96  if (ret < 0) {
97  SPDLOG_LOGGER_ERROR(mLog,
98  "Error: InterfaceVillas failed to init sample pool. "
99  "pool_init returned code {}",
100  ret);
101  std::exit(1);
102  }
103 
104  ret = mNode->prepare();
105  if (ret < 0) {
106  SPDLOG_LOGGER_ERROR(mLog,
107  "Error: Node in InterfaceVillas failed to prepare. "
108  "Prepare returned code {}",
109  ret);
110  std::exit(1);
111  }
112  SPDLOG_LOGGER_INFO(mLog, "Node: {}", mNode->getNameFull());
113 
114  mNode->getFactory()->start(
115  nullptr); //We have no SuperNode, so just hope type_start doesnt use it...
116 
117  ret = mNode->start();
118  if (ret < 0) {
119  SPDLOG_LOGGER_ERROR(mLog,
120  "Fatal error: failed to start node in InterfaceVillas. "
121  "Start returned code {}",
122  ret);
123  close();
124  std::exit(1);
125  }
126 }
127 
128 void InterfaceWorkerVillas::setupNodeSignals() {
129  mNode->out.path = new node::Path();
130  mNode->out.path->signals = std::make_shared<node::SignalList>();
131  node::SignalList::Ptr nodeOutputSignals =
132  mNode->out.path->getOutputSignals(false);
133  nodeOutputSignals->clear();
134  int idx = 0;
135  for (const auto &[id, signal] : mExportSignals) {
136  while (id > idx) {
137  nodeOutputSignals->push_back(
138  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
139  idx++;
140  }
141  nodeOutputSignals->push_back(signal);
142  idx++;
143  }
144 
145  node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(true);
146  if (nodeInputSignals == nullptr) {
147  nodeInputSignals = std::make_shared<node::SignalList>();
148  } else {
149  nodeInputSignals->clear();
150  }
151  idx = 0;
152  for (const auto &[id, signal] : mImportSignals) {
153  while (id > idx) {
154  nodeInputSignals->push_back(
155  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
156  idx++;
157  }
158  nodeInputSignals->push_back(signal);
159  idx++;
160  }
161 }
162 
164  SPDLOG_LOGGER_INFO(mLog, "Closing InterfaceVillas...");
165  int ret = mNode->stop();
166  if (ret < 0) {
167  SPDLOG_LOGGER_ERROR(
168  mLog,
169  "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
170  ret);
171  std::exit(1);
172  }
173  mOpened = false;
174  ret = node::pool_destroy(&mSamplePool);
175  if (ret < 0) {
176  SPDLOG_LOGGER_ERROR(mLog,
177  "Error: failed to destroy SamplePool in "
178  "InterfaceVillas. pool_destroy returned code {}",
179  ret);
180  std::exit(1);
181  }
182 
183  mNode->getFactory()->stop();
184 
185  delete mNode;
186 }
187 
189  std::vector<Interface::AttributePacket> &updatedAttrs) {
190  Sample *sample = nullptr;
191  int ret = 0;
192  bool shouldRead = false;
193  try {
194  auto pollFds = mNode->getPollFDs();
195 
196  if (!pollFds.empty()) {
197  auto pfds = std::vector<struct pollfd>();
198 
199  for (auto pollFd : pollFds) {
200  pfds.push_back(pollfd{.fd = pollFd, .events = POLLIN});
201  }
202 
203  ret = ::poll(pfds.data(), pfds.size(), 0);
204 
205  if (ret < 0) {
206  SPDLOG_LOGGER_ERROR(mLog,
207  "Fatal error: failed to read sample from "
208  "InterfaceVillas. Poll returned code {}",
209  ret);
210  close();
211  std::exit(1);
212  }
213 
214  if (ret == 0) {
215  return;
216  }
217 
218  for (const auto &pfd : pfds) {
219  if (pfd.revents & POLLIN) {
220  shouldRead = true;
221  break;
222  }
223  }
224  } else {
225  //If the node does not support pollFds just do a blocking read
226  shouldRead = true;
227  }
228 
229  if (shouldRead) {
230  sample = node::sample_alloc(&mSamplePool);
231 
232  ret = mNode->read(&sample, 1);
233  if (ret < 0) {
234  SPDLOG_LOGGER_ERROR(mLog,
235  "Fatal error: failed to read sample from "
236  "InterfaceVillas. Read returned code {}",
237  ret);
238  close();
239  std::exit(1);
240  }
241 
242  if (ret != 0) {
243  for (UInt i = 0; i < mImports.size(); i++) {
244  auto importedAttr = std::get<0>(mImports[i])(sample);
245  if (!importedAttr.isNull()) {
246  updatedAttrs.emplace_back(Interface::AttributePacket{
247  importedAttr, i, mCurrentSequenceInterfaceToDpsim,
248  Interface::AttributePacketFlags::PACKET_NO_FLAGS});
249  mCurrentSequenceInterfaceToDpsim++;
250  }
251  }
252 
253  if (!pollFds.empty()) {
254  //Manually clear the event file descriptor since Villas does not do that for some reason
255  //See https://github.com/VILLASframework/node/issues/309
256  uint64_t result = 0;
257  ret = (int)::read(pollFds[0], &result, 8);
258  if (ret < 0) {
259  SPDLOG_LOGGER_WARN(
260  mLog, "Could not reset poll file descriptor! Read returned {}",
261  ret);
262  }
263  if (result > 1) {
264  result = result - 1;
265  ret = (int)::write(pollFds[0], (void *)&result, 8);
266  if (ret < 0) {
267  SPDLOG_LOGGER_WARN(
268  mLog,
269  "Could not decrement poll file descriptor! Write returned {}",
270  ret);
271  }
272  }
273  }
274  }
275 
276  sample_decref(sample);
277  }
278  } catch (const std::exception &) {
279  if (sample)
280  sample_decref(sample);
281 
282  throw;
283  }
284 }
285 
287  std::vector<Interface::AttributePacket> &updatedAttrs) {
288  //Update export sequence IDs
289  for (const auto &packet : updatedAttrs) {
290  if (std::get<1>(mExports[packet.attributeId]) < packet.sequenceId) {
291  std::get<1>(mExports[packet.attributeId]) = packet.sequenceId;
292  }
293  }
294 
295  //Remove outdated packets
296  auto beginOutdated = std::remove_if(
297  updatedAttrs.begin(), updatedAttrs.end(), [this](auto packet) {
298  return std::get<1>(mExports[packet.attributeId]) > packet.sequenceId;
299  });
300  updatedAttrs.erase(beginOutdated, updatedAttrs.end());
301 
302  Sample *sample = nullptr;
303  Int ret = 0;
304  bool done = false;
305  bool sampleFilled = false;
306  try {
307  sample = node::sample_alloc(&mSamplePool);
308  if (sample == nullptr) {
309  SPDLOG_LOGGER_ERROR(mLog, "InterfaceVillas could not allocate a new "
310  "sample! Not sending any data!");
311  return;
312  }
313 
314  sample->signals = mNode->getOutputSignals(false);
315  auto beginExported = std::remove_if(
316  updatedAttrs.begin(), updatedAttrs.end(),
317  [this, &sampleFilled, &sample](auto packet) {
318  if (!std::get<2>(mExports[packet.attributeId])) {
319  //Write attribute to sample ASAP
320  std::get<0>(mExports[packet.attributeId])(packet.value, sample);
321  sampleFilled = true;
322  return true;
323  }
324  return false;
325  });
326  updatedAttrs.erase(beginExported, updatedAttrs.end());
327 
328  //Check if the remaining packets form a complete set
329  if (((long)updatedAttrs.size()) ==
330  std::count_if(mExports.cbegin(), mExports.cend(),
331  [](auto x) { return std::get<2>(x); })) {
332  for (const auto &packet : updatedAttrs) {
333  std::get<0>(mExports[packet.attributeId])(packet.value, sample);
334  }
335  sampleFilled = true;
336  updatedAttrs.clear();
337  }
338 
339  if (sampleFilled) {
340  sample->sequence = mSequence++;
341  sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
342  sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
343  clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
344  sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
345  done = true;
346 
347  do {
348  ret = mNode->write(&sample, 1);
349  } while (ret == 0);
350  if (ret < 0)
351  SPDLOG_LOGGER_ERROR(mLog,
352  "Failed to write samples to InterfaceVillas. Write "
353  "returned code {}",
354  ret);
355 
356  sample_copy(mLastSample, sample);
357  }
358  sample_decref(sample);
359  } catch (const std::exception &) {
360  /* We need to at least send something, so determine where exactly the
361  * timer expired and either resend the last successfully sent sample or
362  * just try to send this one again.
363  * TODO: can this be handled better? */
364  if (!done)
365  sample = mLastSample;
366 
367  while (ret == 0)
368  ret = mNode->write(&sample, 1);
369 
370  sample_decref(sample);
371 
372  if (ret < 0)
373  SPDLOG_LOGGER_ERROR(
374  mLog,
375  "Failed to write samples to InterfaceVillas. Write returned code {}",
376  ret);
377 
378  /* Don't throw here, because we managed to send something */
379  }
380 }
381 
382 void InterfaceWorkerVillas::initVillas() const {
383  if (int ret = node::memory::init(villasHugePages); ret)
384  throw RuntimeError("Error: VillasNode failed to initialize memory system");
385 
386  villas::kernel::rt::init(villasPriority, villasAffinity);
387 }
388 
389 void InterfaceWorkerVillas::configureExport(UInt attributeId,
390  const std::type_info &type,
391  UInt idx, Bool waitForOnWrite,
392  const String &name,
393  const String &unit) {
394  if (mOpened) {
395  if (mLog != nullptr) {
396  SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
397  "Configuration will remain unchanged.");
398  }
399  return;
400  }
401  if (attributeId != mExports.size()) {
402  if (mLog != nullptr) {
403  SPDLOG_LOGGER_WARN(
404  mLog, "The exports already configured do not match with the given "
405  "attribute ID! Configuration will remain unchanged.");
406  }
407  return;
408  }
409 
410  if (type == typeid(Int)) {
411  mExports.emplace_back(
412  [idx](AttributeBase::Ptr attr, Sample *smp) {
413  if (idx >= smp->capacity)
414  throw std::out_of_range("not enough space in allocated sample");
415  if (idx >= smp->length)
416  smp->length = idx + 1;
417 
418  Attribute<Int>::Ptr attrTyped =
419  std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
420 
421  if (attrTyped.isNull())
423 
424  smp->data[idx].i = **attrTyped;
425  },
426  0, waitForOnWrite);
427  mExportSignals[idx] =
428  std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
429  } else if (type == typeid(Real)) {
430  mExports.emplace_back(
431  [idx](AttributeBase::Ptr attr, Sample *smp) {
432  if (idx >= smp->capacity)
433  throw std::out_of_range("not enough space in allocated sample");
434  if (idx >= smp->length)
435  smp->length = idx + 1;
436 
437  Attribute<Real>::Ptr attrTyped =
438  std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
439 
440  if (attrTyped.isNull())
442 
443  smp->data[idx].f = **attrTyped;
444  },
445  0, waitForOnWrite);
446  mExportSignals[idx] =
447  std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
448  } else if (type == typeid(Complex)) {
449  mExports.emplace_back(
450  [idx](AttributeBase::Ptr attr, Sample *smp) {
451  if (idx >= smp->capacity)
452  throw std::out_of_range("not enough space in allocated sample");
453  if (idx >= smp->length)
454  smp->length = idx + 1;
455 
456  Attribute<Complex>::Ptr attrTyped =
457  std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
458 
459  if (attrTyped.isNull())
461 
462  smp->data[idx].z = **attrTyped;
463  },
464  0, waitForOnWrite);
465  mExportSignals[idx] =
466  std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
467  } else if (type == typeid(Bool)) {
468  mExports.emplace_back(
469  [idx](AttributeBase::Ptr attr, Sample *smp) {
470  if (idx >= smp->capacity)
471  throw std::out_of_range("not enough space in allocated sample");
472  if (idx >= smp->length)
473  smp->length = idx + 1;
474 
475  Attribute<Bool>::Ptr attrTyped =
476  std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
477 
478  if (attrTyped.isNull())
480 
481  smp->data[idx].b = **attrTyped;
482  },
483  0, waitForOnWrite);
484  mExportSignals[idx] =
485  std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
486  } else {
487  if (mLog != nullptr) {
488  SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
489  "configuration will remain unchanged!");
490  }
491  }
492 }
493 
494 void InterfaceWorkerVillas::configureImport(UInt attributeId,
495  const std::type_info &type,
496  UInt idx, const String &name,
497  const String &unit) {
498  if (mOpened) {
499  if (mLog != nullptr) {
500  SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
501  "Configuration will remain unchanged.");
502  }
503  return;
504  }
505  if (attributeId != mImports.size()) {
506  if (mLog != nullptr) {
507  SPDLOG_LOGGER_WARN(
508  mLog, "The imports already configured do not match with the given "
509  "attribute ID! Configuration will remain unchanged.");
510  }
511  return;
512  }
513  const auto &log = mLog;
514 
515  if (type == typeid(Int)) {
516  mImports.emplace_back(
517  [idx, log](Sample *smp) -> AttributeBase::Ptr {
518  if (idx >= smp->length) {
519  log->error("incomplete data received from InterfaceVillas");
520  return nullptr;
521  }
523  AttributeStatic<Int>::make(smp->data[idx].i));
524  },
525  0);
526  mImportSignals[idx] =
527  std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
528  } else if (type == typeid(Real)) {
529  mImports.emplace_back(
530  [idx, log](Sample *smp) -> AttributeBase::Ptr {
531  if (idx >= smp->length) {
532  log->error("incomplete data received from InterfaceVillas");
533  return nullptr;
534  }
536  AttributeStatic<Real>::make(smp->data[idx].f));
537  },
538  0);
539  mImportSignals[idx] =
540  std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
541  } else if (type == typeid(Complex)) {
542  mImports.emplace_back(
543  [idx, log](Sample *smp) -> AttributeBase::Ptr {
544  if (idx >= smp->length) {
545  log->error("incomplete data received from InterfaceVillas");
546  return nullptr;
547  }
549  AttributeStatic<Complex>::make(smp->data[idx].z));
550  },
551  0);
552  mImportSignals[idx] =
553  std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
554  } else if (type == typeid(Bool)) {
555  mImports.emplace_back(
556  [idx, log](Sample *smp) -> AttributeBase::Ptr {
557  if (idx >= smp->length) {
558  log->error("incomplete data received from InterfaceVillas");
559  return nullptr;
560  }
562  AttributeStatic<Bool>::make(smp->data[idx].b));
563  },
564  0);
565  mImportSignals[idx] =
566  std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
567  } else {
568  if (mLog != nullptr) {
569  SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
570  "configuration will remain unchanged!");
571  }
572  }
573 }
574 
575 void InterfaceWorkerVillas::printSignals() const {
576  SPDLOG_LOGGER_INFO(
577  mLog,
578  "InterfaceWorkerVillas Settings: Queue Length: {}, Sample Length: {}",
579  mQueueLength, mSampleLength);
580  SPDLOG_LOGGER_INFO(mLog, "Export signals:");
581  for (const auto &[id, signal] : mExportSignals) {
582  SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
583  signal->name, signal->unit,
584  node::signalTypeToString(signal->type));
585  }
586 
587  SPDLOG_LOGGER_INFO(mLog, "Import signals:");
588  for (const auto &[id, signal] : mImportSignals) {
589  SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
590  signal->name, signal->unit,
591  node::signalTypeToString(signal->type));
592  }
593 }
void writeValuesToEnv(std::vector< Interface::AttributePacket > &updatedAttrs) override
void readValuesFromEnv(std::vector< Interface::AttributePacket > &updatedAttrs) override