DPsim
InterfaceVillasQueueless.cpp
1 /* Author: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
2  * SPDX-FileCopyrightText: 2023-2024 Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
3  * SPDX-License-Identifier: MPL-2.0
4  */
5 
6 #include <dpsim-villas/InterfaceVillasQueueless.h>
7 #include <dpsim-villas/InterfaceWorkerVillas.h>
8 #include <memory>
9 #include <spdlog/spdlog.h>
10 #include <typeinfo>
11 #include <villas/node.h>
12 #include <villas/node/memory.hpp>
13 #include <villas/node/memory_type.hpp>
14 #include <villas/path.hpp>
15 #include <villas/signal_type.hpp>
16 
17 using namespace CPS;
18 using namespace villas;
19 
20 namespace DPsim {
21 
22 InterfaceVillasQueueless::InterfaceVillasQueueless(
23  const String &nodeConfig, const String &name,
24  spdlog::level::level_enum logLevel)
25  : Interface(name, logLevel), mNodeConfig(nodeConfig), mNode(nullptr),
26  mSamplePool(), mSequenceToDpsim(0), mSequenceFromDpsim(0) {}
27 
28 void InterfaceVillasQueueless::createNode() {
29  if (villas::node::memory::init(100) != 0) {
30  SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to initialize memory subsystem!");
31  std::exit(1);
32  }
33  json_error_t error;
34  json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
35  if (config == nullptr) {
36  SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to parse node config! Error: {}",
37  error.text);
38  throw JsonError(config, error);
39  }
40 
41  const json_t *nodeType = json_object_get(config, "type");
42  if (nodeType == nullptr) {
43  SPDLOG_LOGGER_ERROR(mLog, "Error: Node config does not contain type-key!");
44  std::exit(1);
45  }
46  String nodeTypeString = json_string_value(nodeType);
47 
48  mNode = node::NodeFactory::make(nodeTypeString);
49 
50  int ret = 0;
51  ret = mNode->parse(config);
52  if (ret < 0) {
53  SPDLOG_LOGGER_ERROR(mLog,
54  "Error: Node in InterfaceVillas failed to parse "
55  "config. Parse returned code {}",
56  ret);
57  std::exit(1);
58  }
59  ret = mNode->check();
60  if (ret < 0) {
61  SPDLOG_LOGGER_ERROR(
62  mLog,
63  "Error: Node in InterfaceVillas failed check. Check returned code {}",
64  ret);
65  std::exit(1);
66  }
67  struct villas::node::memory::Type *pool_mt = &villas::node::memory::heap;
68  ret = node::pool_init(&mSamplePool, 16,
69  sizeof(node::Sample) + SAMPLE_DATA_LENGTH(64), pool_mt);
70  if (ret < 0) {
71  SPDLOG_LOGGER_ERROR(mLog,
72  "Error: InterfaceVillas failed to init sample pool. "
73  "pool_init returned code {}",
74  ret);
75  std::exit(1);
76  }
77 
78  ret = mNode->prepare();
79  if (ret < 0) {
80  SPDLOG_LOGGER_ERROR(mLog,
81  "Error: Node in InterfaceVillas failed to prepare. "
82  "Prepare returned code {}",
83  ret);
84  std::exit(1);
85  }
86  SPDLOG_LOGGER_INFO(mLog, "Node: {}", mNode->getNameFull());
87 }
88 
89 static node::SignalType stdTypeToNodeType(const std::type_info &type) {
90  if (type == typeid(Real)) {
91  return node::SignalType::FLOAT;
92  } else if (type == typeid(Int)) {
93  return node::SignalType::INTEGER;
94  } else if (type == typeid(Bool)) {
95  return node::SignalType::BOOLEAN;
96  } else if (type == typeid(Complex)) {
97  return node::SignalType::COMPLEX;
98  } else {
99  return node::SignalType::INVALID;
100  }
101 }
102 
103 void InterfaceVillasQueueless::createSignals() {
104  mNode->out.path = new node::Path();
105  mNode->out.path->signals = std::make_shared<node::SignalList>();
106  node::SignalList::Ptr nodeOutputSignals =
107  mNode->out.path->getOutputSignals(false);
108  nodeOutputSignals->clear();
109  unsigned int idx = 0;
110  for (const auto &[attr, id] : mExportAttrsDpsim) {
111  while (id > idx) {
112  nodeOutputSignals->push_back(
113  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
114  idx++;
115  }
116  nodeOutputSignals->push_back(std::make_shared<node::Signal>(
117  "", "", stdTypeToNodeType(attr->getType())));
118  }
119 
120  node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(true);
121  if (nodeInputSignals == nullptr) {
122  nodeInputSignals = std::make_shared<node::SignalList>();
123  } else {
124  nodeInputSignals->clear();
125  }
126  idx = 0;
127  for (const auto &[attr, id, blockOnRead, syncOnSimulationStart] :
128  mImportAttrsDpsim) {
129  while (id > idx) {
130  nodeInputSignals->push_back(
131  std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
132  idx++;
133  }
134  nodeInputSignals->push_back(std::make_shared<node::Signal>(
135  "", "", stdTypeToNodeType(attr->getType())));
136  idx++;
137  }
138 }
139 
140 void InterfaceVillasQueueless::open() {
141  createNode();
142  createSignals();
143 
144  // We have no SuperNode, so just hope type_start doesn't use it...
145  mNode->getFactory()->start(nullptr);
146 
147  auto ret = mNode->start();
148  if (ret < 0) {
149  SPDLOG_LOGGER_ERROR(mLog,
150  "Fatal error: failed to start node in InterfaceVillas. "
151  "Start returned code {}",
152  ret);
153  close();
154  std::exit(1);
155  }
156  mOpened = true;
157  mSequenceFromDpsim = 0;
158  mSequenceToDpsim = 0;
159 }
160 
161 void InterfaceVillasQueueless::close() {
162  SPDLOG_LOGGER_INFO(mLog, "Closing InterfaceVillas...");
163  int ret = mNode->stop();
164  if (ret < 0) {
165  SPDLOG_LOGGER_ERROR(
166  mLog,
167  "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
168  ret);
169  std::exit(1);
170  }
171  mOpened = false;
172  ret = node::pool_destroy(&mSamplePool);
173  if (ret < 0) {
174  SPDLOG_LOGGER_ERROR(mLog,
175  "Error: failed to destroy SamplePool in "
176  "InterfaceVillas. pool_destroy returned code {}",
177  ret);
178  std::exit(1);
179  }
180 
181  mNode->getFactory()->stop();
182 
183  delete mNode;
184  mOpened = false;
185 }
186 
187 CPS::Task::List InterfaceVillasQueueless::getTasks() {
188  auto tasks = CPS::Task::List();
189  if (!mImportAttrsDpsim.empty()) {
190  tasks.push_back(std::make_shared<InterfaceVillasQueueless::PreStep>(*this));
191  }
192  if (!mExportAttrsDpsim.empty()) {
193  tasks.push_back(
194  std::make_shared<InterfaceVillasQueueless::PostStep>(*this));
195  }
196  return tasks;
197 }
198 
199 void InterfaceVillasQueueless::PreStep::execute(Real time, Int timeStepCount) {
200  auto seqnum = mIntf.readFromVillas();
201  static size_t overrunCounter = 0;
202  if (seqnum != mIntf.mSequenceToDpsim + 1) {
203  overrunCounter++;
204  }
205  if (overrunCounter > 10000) {
206  SPDLOG_LOGGER_WARN(mIntf.mLog, "{} Overrun(s) detected!", overrunCounter);
207  overrunCounter = 0;
208  }
209  mIntf.mSequenceToDpsim = seqnum;
210 }
211 
212 Int InterfaceVillasQueueless::readFromVillas() {
213  node::Sample *sample = nullptr;
214  Int seqnum = 0;
215  int ret = 0;
216  if (mImportAttrsDpsim.size() == 0) {
217  return 0;
218  }
219  try {
220  sample = node::sample_alloc(&mSamplePool);
221  ret = 0;
222  while (ret == 0) {
223  ret = mNode->read(&sample, 1);
224  if (ret < 0) {
225  SPDLOG_LOGGER_ERROR(mLog,
226  "Fatal error: failed to read sample from "
227  "InterfaceVillas. Read returned code {}",
228  ret);
229  close();
230  std::exit(1);
231  } else if (ret == 0) {
232  SPDLOG_LOGGER_WARN(mLog,
233  "InterfaceVillas read returned 0. Retrying...");
234  }
235  }
236 
237  if (sample->length != mImportAttrsDpsim.size()) {
238  SPDLOG_LOGGER_ERROR(mLog,
239  "Error: Received Sample length ({}) does not match "
240  "configured attributes length ({})",
241  sample->length, mImportAttrsDpsim.size());
242  throw RuntimeError(
243  "Received Sample length does not match configured attributes length");
244  }
245 
246  for (size_t i = 0; i < mImportAttrsDpsim.size(); i++) {
247  auto attr = std::get<0>(mImportAttrsDpsim[i]);
248  if (attr->getType() == typeid(Real)) {
249  auto attrReal =
250  std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
251  attrReal->set(sample->data[i].f);
252  } else if (attr->getType() == typeid(Int)) {
253  auto attrInt = std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
254  attrInt->set(sample->data[i].i);
255  if (i == 0) {
256  seqnum = sample->data[i].i;
257  }
258  } else if (attr->getType() == typeid(Bool)) {
259  auto attrBool =
260  std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
261  attrBool->set(sample->data[i].b);
262  } else if (attr->getType() == typeid(Complex)) {
263  auto attrComplex =
264  std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
265  attrComplex->set(
266  Complex(sample->data[i].z.real(), sample->data[i].z.imag()));
267  } else {
268  SPDLOG_LOGGER_ERROR(mLog, "Error: Unsupported attribute type!");
269  throw RuntimeError("Unsupported attribute type!");
270  }
271  }
272 
273  sample_decref(sample);
274  } catch (const std::exception &) {
275  if (sample)
276  sample_decref(sample);
277 
278  throw;
279  }
280  return seqnum;
281 }
282 
283 void InterfaceVillasQueueless::PostStep::execute(Real time, Int timeStepCount) {
284  mIntf.writeToVillas();
285 }
286 
287 void InterfaceVillasQueueless::writeToVillas() {
288  if (mExportAttrsDpsim.size() == 0) {
289  return;
290  }
291  node::Sample *sample = nullptr;
292  Int ret = 0;
293  try {
294  sample = node::sample_alloc(&mSamplePool);
295  if (sample == nullptr) {
296  SPDLOG_LOGGER_ERROR(mLog, "InterfaceVillas could not allocate a new "
297  "sample! Not sending any data!");
298  return;
299  }
300 
301  sample->signals = mNode->getOutputSignals(false);
302 
303  for (size_t i = 0; i < mExportAttrsDpsim.size(); i++) {
304  auto attr = std::get<0>(mExportAttrsDpsim[i]);
305  if (attr->getType() == typeid(Real)) {
306  auto attrReal =
307  std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
308  sample->data[i].f = attrReal->get();
309  } else if (attr->getType() == typeid(Int)) {
310  auto attrInt = std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
311  sample->data[i].i = attrInt->get();
312  } else if (attr->getType() == typeid(Bool)) {
313  auto attrBool =
314  std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
315  sample->data[i].b = attrBool->get();
316  } else if (attr->getType() == typeid(Complex)) {
317  auto attrComplex =
318  std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
319  sample->data[i].z = std::complex<float>(attrComplex->get().real(),
320  attrComplex->get().imag());
321  } else {
322  SPDLOG_LOGGER_ERROR(mLog, "Error: Unsupported attribute type!");
323  throw RuntimeError("Unsupported attribute type!");
324  }
325  }
326 
327  sample->length = mExportAttrsDpsim.size();
328  sample->sequence = mSequenceFromDpsim++;
329  sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
330  sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
331  clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
332  sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
333 
334  do {
335  ret = mNode->write(&sample, 1);
336  } while (ret == 0);
337  if (ret < 0)
338  SPDLOG_LOGGER_ERROR(mLog,
339  "Failed to write samples to InterfaceVillas. Write "
340  "returned code {}",
341  ret);
342 
343  sample_decref(sample);
344  } catch (const std::exception &) {
345  sample_decref(sample);
346 
347  if (ret < 0)
348  SPDLOG_LOGGER_ERROR(
349  mLog,
350  "Failed to write samples to InterfaceVillas. Write returned code {}",
351  ret);
352 
353  // Don't throw here, because we managed to send something
354  }
355 }
356 
358  // Block on read until all attributes with syncOnSimulationStart are read
359  mSequenceToDpsim = this->readFromVillas();
360 }
361 
362 void InterfaceVillasQueueless::syncExports() {
363  // Just push all the attributes
364  this->writeToVillas();
365 }
366 
367 void InterfaceVillasQueueless::printVillasSignals() const {
368  SPDLOG_LOGGER_INFO(mLog, "Export signals:");
369  for (const auto &signal : *mNode->getOutputSignals(true)) {
370  SPDLOG_LOGGER_INFO(mLog, "Name: {}, Unit: {}, Type: {}", signal->name,
371  signal->unit, node::signalTypeToString(signal->type));
372  }
373 
374  SPDLOG_LOGGER_INFO(mLog, "Import signals:");
375  for (const auto &signal : *mNode->getInputSignals(true)) {
376  SPDLOG_LOGGER_INFO(mLog, "Name: {}, Unit: {}, Type: {}", signal->name,
377  signal->unit, node::signalTypeToString(signal->type));
378  }
379 }
380 
381 } // namespace DPsim
virtual void syncImports() override
Function called by the Simulation to perform interface synchronization.