DPsim
Loading...
Searching...
No Matches
InterfaceWorkerVillas.cpp
1// SPDX-License-Identifier: Apache-2.0
2
3#include <cstdio>
4#include <cstdlib>
5#include <stdexcept>
6
7#include <chrono>
8#include <poll.h>
9#include <thread>
10#include <unistd.h>
11
12#include <spdlog/sinks/stdout_color_sinks.h>
13
14#include <dpsim-models/Logger.h>
15#include <dpsim-villas/InterfaceWorkerVillas.h>
16#include <villas/path.hpp>
17#include <villas/signal_list.hpp>
18
19using namespace CPS;
20using namespace DPsim;
21
22Bool InterfaceWorkerVillas::villasInitialized = false;
23UInt InterfaceWorkerVillas::villasAffinity = 0;
24UInt InterfaceWorkerVillas::villasPriority = 0;
25UInt InterfaceWorkerVillas::villasHugePages = 100;
26
27InterfaceWorkerVillas::InterfaceWorkerVillas(const String &nodeConfig,
28 UInt queueLength,
29 UInt sampleLength,
30 spdlog::level::level_enum logLevel)
31 : InterfaceWorker(), mNodeConfig(nodeConfig), mQueueLength(queueLength),
32 mSampleLength(sampleLength) {
33 mLog = CPS::Logger::get("InterfaceWorkerVillas", logLevel, logLevel);
34}
35
37 SPDLOG_LOGGER_INFO(mLog, "Opening InterfaceWorkerVillas...");
38 Log::getInstance().setLevel(mLog->level());
39
40 if (!InterfaceWorkerVillas::villasInitialized) {
41 SPDLOG_LOGGER_INFO(mLog, "Initializing Villas...");
42 initVillas();
43 InterfaceWorkerVillas::villasInitialized = true;
44 }
45
46 json_error_t error;
47 json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
48 if (config == nullptr) {
49 SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to parse node config! Error: {}",
50 error.text);
51 throw JsonError(config, error);
52 }
53
54 const json_t *nodeType = json_object_get(config, "type");
55 if (nodeType == nullptr) {
56 SPDLOG_LOGGER_ERROR(mLog, "Error: Node config does not contain type-key!");
57 std::exit(1);
58 }
59 String nodeTypeString = json_string_value(nodeType);
60
61 mNode = node::NodeFactory::make(nodeTypeString);
62
63 int ret = 0;
64 ret = mNode->parse(config);
65 if (ret < 0) {
66 SPDLOG_LOGGER_ERROR(mLog,
67 "Error: Node in InterfaceVillas failed to parse "
68 "config. Parse returned code {}",
69 ret);
70 std::exit(1);
71 }
72 ret = mNode->check();
73 if (ret < 0) {
74 SPDLOG_LOGGER_ERROR(
75 mLog,
76 "Error: Node in InterfaceVillas failed check. Check returned code {}",
77 ret);
78 std::exit(1);
79 }
80
81 SPDLOG_LOGGER_INFO(mLog, "Preparing VILLASNode instance...");
82 setupNodeSignals();
83 prepareNode();
84 SPDLOG_LOGGER_INFO(mLog, "Node is ready to send / receive data!");
85 mOpened = true;
86
87 mSequence = 0;
88 mLastSample = node::sample_alloc(&mSamplePool);
89 mLastSample->signals = mNode->getInputSignals(false);
90 mLastSample->sequence = 0;
91 mLastSample->ts.origin.tv_sec = 0;
92 mLastSample->ts.origin.tv_nsec = 0;
93
94 std::memset(&mLastSample->data, 0, mLastSample->capacity * sizeof(float));
95}
96
97void InterfaceWorkerVillas::prepareNode() {
98 int ret = node::pool_init(&mSamplePool, mQueueLength,
99 sizeof(Sample) + SAMPLE_DATA_LENGTH(mSampleLength));
100 if (ret < 0) {
101 SPDLOG_LOGGER_ERROR(mLog,
102 "Error: InterfaceVillas failed to init sample pool. "
103 "pool_init returned code {}",
104 ret);
105 std::exit(1);
106 }
107
108 ret = mNode->prepare();
109 if (ret < 0) {
110 SPDLOG_LOGGER_ERROR(mLog,
111 "Error: Node in InterfaceVillas failed to prepare. "
112 "Prepare returned code {}",
113 ret);
114 std::exit(1);
115 }
116 SPDLOG_LOGGER_INFO(mLog, "Node: {}", mNode->getNameFull());
117
118 mNode->getFactory()->start(
119 nullptr); //We have no SuperNode, so just hope type_start doesnt use it...
120
121 ret = mNode->start();
122 if (ret < 0) {
123 SPDLOG_LOGGER_ERROR(mLog,
124 "Fatal error: failed to start node in InterfaceVillas. "
125 "Start returned code {}",
126 ret);
127 close();
128 std::exit(1);
129 }
130}
131
132void InterfaceWorkerVillas::setupNodeSignals() {
133 mNode->out.path = new node::Path();
134 mNode->out.path->signals = std::make_shared<node::SignalList>();
135 node::SignalList::Ptr nodeOutputSignals =
136 mNode->out.path->getOutputSignals(false);
137 nodeOutputSignals->clear();
138 int idx = 0;
139 for (const auto &[id, signal] : mExportSignals) {
140 while (id > idx) {
141 nodeOutputSignals->push_back(
142 std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
143 idx++;
144 }
145 nodeOutputSignals->push_back(signal);
146 idx++;
147 }
148
149 node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(true);
150 if (nodeInputSignals == nullptr) {
151 nodeInputSignals = std::make_shared<node::SignalList>();
152 } else {
153 nodeInputSignals->clear();
154 }
155 idx = 0;
156 for (const auto &[id, signal] : mImportSignals) {
157 while (id > idx) {
158 nodeInputSignals->push_back(
159 std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
160 idx++;
161 }
162 nodeInputSignals->push_back(signal);
163 idx++;
164 }
165}
166
168 SPDLOG_LOGGER_INFO(mLog, "Closing InterfaceVillas...");
169 int ret = mNode->stop();
170 if (ret < 0) {
171 SPDLOG_LOGGER_ERROR(
172 mLog,
173 "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
174 ret);
175 std::exit(1);
176 }
177 mOpened = false;
178 ret = node::pool_destroy(&mSamplePool);
179 if (ret < 0) {
180 SPDLOG_LOGGER_ERROR(mLog,
181 "Error: failed to destroy SamplePool in "
182 "InterfaceVillas. pool_destroy returned code {}",
183 ret);
184 std::exit(1);
185 }
186
187 mNode->getFactory()->stop();
188
189 delete mNode;
190}
191
193 std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
194 Sample *sample = nullptr;
195 int ret = 0;
196 bool shouldRead = false;
197 try {
198 auto pollFds = mNode->getPollFDs();
199
200 if (!pollFds.empty()) {
201 auto pfds = std::vector<struct pollfd>();
202
203 for (auto pollFd : pollFds) {
204 pfds.push_back(pollfd{.fd = pollFd, .events = POLLIN});
205 }
206
207 ret = ::poll(pfds.data(), pfds.size(), 0);
208
209 if (ret < 0) {
210 SPDLOG_LOGGER_ERROR(mLog,
211 "Fatal error: failed to read sample from "
212 "InterfaceVillas. Poll returned code {}",
213 ret);
214 close();
215 std::exit(1);
216 }
217
218 if (ret == 0) {
219 return;
220 }
221
222 for (const auto &pfd : pfds) {
223 if (pfd.revents & POLLIN) {
224 shouldRead = true;
225 break;
226 }
227 }
228 } else {
229 //If the node does not support pollFds just do a blocking read
230 shouldRead = true;
231 }
232
233 if (shouldRead) {
234 sample = node::sample_alloc(&mSamplePool);
235
236 ret = mNode->read(&sample, 1);
237 if (ret < 0) {
238 SPDLOG_LOGGER_ERROR(mLog,
239 "Fatal error: failed to read sample from "
240 "InterfaceVillas. Read returned code {}",
241 ret);
242 close();
243 std::exit(1);
244 }
245
246 if (ret != 0) {
247 for (UInt i = 0; i < mImports.size(); i++) {
248 auto importedAttr = std::get<0>(mImports[i])(sample);
249 if (!importedAttr.isNull()) {
250 updatedAttrs.emplace_back(InterfaceQueued::AttributePacket{
251 importedAttr, i, mCurrentSequenceInterfaceToDpsim,
252 InterfaceQueued::AttributePacketFlags::PACKET_NO_FLAGS});
253 mCurrentSequenceInterfaceToDpsim++;
254 }
255 }
256
257 if (!pollFds.empty()) {
258 //Manually clear the event file descriptor since Villas does not do that for some reason
259 //See https://github.com/VILLASframework/node/issues/309
260 uint64_t result = 0;
261 ret = (int)::read(pollFds[0], &result, 8);
262 if (ret < 0) {
263 SPDLOG_LOGGER_WARN(
264 mLog, "Could not reset poll file descriptor! Read returned {}",
265 ret);
266 }
267 if (result > 1) {
268 result = result - 1;
269 ret = (int)::write(pollFds[0], (void *)&result, 8);
270 if (ret < 0) {
271 SPDLOG_LOGGER_WARN(
272 mLog,
273 "Could not decrement poll file descriptor! Write returned {}",
274 ret);
275 }
276 }
277 }
278 }
279
280 sample_decref(sample);
281 }
282 } catch (const std::exception &) {
283 if (sample)
284 sample_decref(sample);
285
286 throw;
287 }
288}
289
291 std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
292 // Update export sequence IDs
293 for (const auto &packet : updatedAttrs) {
294 if (std::get<1>(mExports[packet.attributeId]) < packet.sequenceId) {
295 std::get<1>(mExports[packet.attributeId]) = packet.sequenceId;
296 }
297 }
298
299 // Remove outdated packets
300 auto beginOutdated = std::remove_if(
301 updatedAttrs.begin(), updatedAttrs.end(), [this](auto packet) {
302 return std::get<1>(mExports[packet.attributeId]) > packet.sequenceId;
303 });
304 updatedAttrs.erase(beginOutdated, updatedAttrs.end());
305
306 Sample *sample = nullptr;
307 Int ret = 0;
308 bool done = false;
309 bool sampleFilled = false;
310 try {
311 sample = node::sample_alloc(&mSamplePool);
312 if (sample == nullptr) {
313 SPDLOG_LOGGER_ERROR(mLog, "InterfaceVillas could not allocate a new "
314 "sample! Not sending any data!");
315 return;
316 }
317
318 sample->signals = mNode->getOutputSignals(false);
319 auto beginExported = std::remove_if(
320 updatedAttrs.begin(), updatedAttrs.end(),
321 [this, &sampleFilled, &sample](auto packet) {
322 if (!std::get<2>(mExports[packet.attributeId])) {
323 //Write attribute to sample ASAP
324 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
325 sampleFilled = true;
326 return true;
327 }
328 return false;
329 });
330 updatedAttrs.erase(beginExported, updatedAttrs.end());
331
332 //Check if the remaining packets form a complete set
333 if (((long)updatedAttrs.size()) ==
334 std::count_if(mExports.cbegin(), mExports.cend(),
335 [](auto x) { return std::get<2>(x); })) {
336 for (const auto &packet : updatedAttrs) {
337 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
338 }
339 sampleFilled = true;
340 updatedAttrs.clear();
341 }
342
343 if (sampleFilled) {
344 sample->sequence = mSequence++;
345 sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
346 sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
347 clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
348 sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
349 done = true;
350
351 do {
352 ret = mNode->write(&sample, 1);
353 } while (ret == 0);
354 if (ret < 0)
355 SPDLOG_LOGGER_ERROR(mLog,
356 "Failed to write samples to InterfaceVillas. Write "
357 "returned code {}",
358 ret);
359
360 sample_copy(mLastSample, sample);
361 }
362 sample_decref(sample);
363 } catch (const std::exception &) {
364 /* We need to at least send something, so determine where exactly the
365 * timer expired and either resend the last successfully sent sample or
366 * just try to send this one again.
367 * TODO: can this be handled better? */
368 if (!done)
369 sample = mLastSample;
370
371 while (ret == 0)
372 ret = mNode->write(&sample, 1);
373
374 sample_decref(sample);
375
376 if (ret < 0)
377 SPDLOG_LOGGER_ERROR(
378 mLog,
379 "Failed to write samples to InterfaceVillas. Write returned code {}",
380 ret);
381
382 /* Don't throw here, because we managed to send something */
383 }
384}
385
386void InterfaceWorkerVillas::initVillas() const {
387 if (int ret = node::memory::init(villasHugePages); ret)
388 throw RuntimeError("Error: VillasNode failed to initialize memory system");
389
390 villas::kernel::rt::init(villasPriority, villasAffinity);
391}
392
393void InterfaceWorkerVillas::configureExport(UInt attributeId,
394 const std::type_info &type,
395 UInt idx, Bool waitForOnWrite,
396 const String &name,
397 const String &unit) {
398 if (mOpened) {
399 if (mLog != nullptr) {
400 SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
401 "Configuration will remain unchanged.");
402 }
403 return;
404 }
405 if (attributeId != mExports.size()) {
406 if (mLog != nullptr) {
407 SPDLOG_LOGGER_WARN(
408 mLog, "The exports already configured do not match with the given "
409 "attribute ID! Configuration will remain unchanged.");
410 }
411 return;
412 }
413
414 if (type == typeid(Int)) {
415 mExports.emplace_back(
416 [idx](AttributeBase::Ptr attr, Sample *smp) {
417 if (idx >= smp->capacity)
418 throw std::out_of_range("not enough space in allocated sample");
419 if (idx >= smp->length)
420 smp->length = idx + 1;
421
422 Attribute<Int>::Ptr attrTyped =
423 std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
424
425 if (attrTyped.isNull())
426 throw InvalidAttributeException();
427
428 smp->data[idx].i = **attrTyped;
429 },
430 0, waitForOnWrite);
431 mExportSignals[idx] =
432 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
433 } else if (type == typeid(Real)) {
434 mExports.emplace_back(
435 [idx](AttributeBase::Ptr attr, Sample *smp) {
436 if (idx >= smp->capacity)
437 throw std::out_of_range("not enough space in allocated sample");
438 if (idx >= smp->length)
439 smp->length = idx + 1;
440
441 Attribute<Real>::Ptr attrTyped =
442 std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
443
444 if (attrTyped.isNull())
445 throw InvalidAttributeException();
446
447 smp->data[idx].f = **attrTyped;
448 },
449 0, waitForOnWrite);
450 mExportSignals[idx] =
451 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
452 } else if (type == typeid(Complex)) {
453 mExports.emplace_back(
454 [idx](AttributeBase::Ptr attr, Sample *smp) {
455 if (idx >= smp->capacity)
456 throw std::out_of_range("not enough space in allocated sample");
457 if (idx >= smp->length)
458 smp->length = idx + 1;
459
460 Attribute<Complex>::Ptr attrTyped =
461 std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
462
463 if (attrTyped.isNull())
464 throw InvalidAttributeException();
465
466 smp->data[idx].z = **attrTyped;
467 },
468 0, waitForOnWrite);
469 mExportSignals[idx] =
470 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
471 } else if (type == typeid(Bool)) {
472 mExports.emplace_back(
473 [idx](AttributeBase::Ptr attr, Sample *smp) {
474 if (idx >= smp->capacity)
475 throw std::out_of_range("not enough space in allocated sample");
476 if (idx >= smp->length)
477 smp->length = idx + 1;
478
479 Attribute<Bool>::Ptr attrTyped =
480 std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
481
482 if (attrTyped.isNull())
483 throw InvalidAttributeException();
484
485 smp->data[idx].b = **attrTyped;
486 },
487 0, waitForOnWrite);
488 mExportSignals[idx] =
489 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
490 } else {
491 if (mLog != nullptr) {
492 SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
493 "configuration will remain unchanged!");
494 }
495 }
496}
497
498void InterfaceWorkerVillas::configureImport(UInt attributeId,
499 const std::type_info &type,
500 UInt idx, const String &name,
501 const String &unit) {
502 if (mOpened) {
503 if (mLog != nullptr) {
504 SPDLOG_LOGGER_WARN(mLog, "InterfaceVillas has already been opened! "
505 "Configuration will remain unchanged.");
506 }
507 return;
508 }
509 if (attributeId != mImports.size()) {
510 if (mLog != nullptr) {
511 SPDLOG_LOGGER_WARN(
512 mLog, "The imports already configured do not match with the given "
513 "attribute ID! Configuration will remain unchanged.");
514 }
515 return;
516 }
517 const auto &log = mLog;
518
519 if (type == typeid(Int)) {
520 mImports.emplace_back(
521 [idx, log](Sample *smp) -> AttributeBase::Ptr {
522 if (idx >= smp->length) {
523 log->error("incomplete data received from InterfaceVillas");
524 return nullptr;
525 }
526 return AttributePointer<AttributeBase>(
527 AttributeStatic<Int>::make(smp->data[idx].i));
528 },
529 0);
530 mImportSignals[idx] =
531 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
532 } else if (type == typeid(Real)) {
533 mImports.emplace_back(
534 [idx, log](Sample *smp) -> AttributeBase::Ptr {
535 if (idx >= smp->length) {
536 log->error("incomplete data received from InterfaceVillas");
537 return nullptr;
538 }
539 return AttributePointer<AttributeBase>(
540 AttributeStatic<Real>::make(smp->data[idx].f));
541 },
542 0);
543 mImportSignals[idx] =
544 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
545 } else if (type == typeid(Complex)) {
546 mImports.emplace_back(
547 [idx, log](Sample *smp) -> AttributeBase::Ptr {
548 if (idx >= smp->length) {
549 log->error("incomplete data received from InterfaceVillas");
550 return nullptr;
551 }
552 return AttributePointer<AttributeBase>(
553 AttributeStatic<Complex>::make(smp->data[idx].z));
554 },
555 0);
556 mImportSignals[idx] =
557 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
558 } else if (type == typeid(Bool)) {
559 mImports.emplace_back(
560 [idx, log](Sample *smp) -> AttributeBase::Ptr {
561 if (idx >= smp->length) {
562 log->error("incomplete data received from InterfaceVillas");
563 return nullptr;
564 }
565 return AttributePointer<AttributeBase>(
566 AttributeStatic<Bool>::make(smp->data[idx].b));
567 },
568 0);
569 mImportSignals[idx] =
570 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
571 } else {
572 if (mLog != nullptr) {
573 SPDLOG_LOGGER_WARN(mLog, "Unsupported attribute type! Interface "
574 "configuration will remain unchanged!");
575 }
576 }
577}
578
579void InterfaceWorkerVillas::printSignals() const {
580 SPDLOG_LOGGER_INFO(
581 mLog,
582 "InterfaceWorkerVillas Settings: Queue Length: {}, Sample Length: {}",
583 mQueueLength, mSampleLength);
584 SPDLOG_LOGGER_INFO(mLog, "Export signals:");
585 for (const auto &[id, signal] : mExportSignals) {
586 SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
587 signal->name, signal->unit,
588 node::signalTypeToString(signal->type));
589 }
590
591 SPDLOG_LOGGER_INFO(mLog, "Import signals:");
592 for (const auto &[id, signal] : mImportSignals) {
593 SPDLOG_LOGGER_INFO(mLog, "ID: {}, Name: {}, Unit: {}, Type: {}", id,
594 signal->name, signal->unit,
595 node::signalTypeToString(signal->type));
596 }
597}
void writeValuesToEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override
void readValuesFromEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override