12 #include <spdlog/sinks/stdout_color_sinks.h>
14 #include <dpsim-models/Logger.h>
15 #include <dpsim-villas/InterfaceWorkerVillas.h>
16 #include <villas/path.hpp>
17 #include <villas/signal_list.hpp>
20 using namespace DPsim;
22 Bool InterfaceWorkerVillas::villasInitialized =
false;
23 UInt InterfaceWorkerVillas::villasAffinity = 0;
24 UInt InterfaceWorkerVillas::villasPriority = 0;
25 UInt InterfaceWorkerVillas::villasHugePages = 100;
27 InterfaceWorkerVillas::InterfaceWorkerVillas(
const String &nodeConfig,
30 :
InterfaceWorker(), mNodeConfig(nodeConfig), mQueueLength(queueLength),
31 mSampleLength(sampleLength) {}
33 void InterfaceWorkerVillas::open() {
34 SPDLOG_LOGGER_INFO(mLog,
"Opening InterfaceWorkerVillas...");
36 if (!InterfaceWorkerVillas::villasInitialized) {
37 SPDLOG_LOGGER_INFO(mLog,
"Initializing Villas...");
39 InterfaceWorkerVillas::villasInitialized =
true;
43 json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
44 if (config ==
nullptr) {
45 SPDLOG_LOGGER_ERROR(mLog,
"Error: Failed to parse node config! Error: {}",
47 throw JsonError(config, error);
50 const json_t *nodeType = json_object_get(config,
"type");
51 if (nodeType ==
nullptr) {
52 SPDLOG_LOGGER_ERROR(mLog,
"Error: Node config does not contain type-key!");
55 String nodeTypeString = json_string_value(nodeType);
57 mNode = node::NodeFactory::make(nodeTypeString);
60 ret = mNode->parse(config);
62 SPDLOG_LOGGER_ERROR(mLog,
63 "Error: Node in InterfaceVillas failed to parse "
64 "config. Parse returned code {}",
72 "Error: Node in InterfaceVillas failed check. Check returned code {}",
77 SPDLOG_LOGGER_INFO(mLog,
"Preparing VILLASNode instance...");
80 SPDLOG_LOGGER_INFO(mLog,
"Node is ready to send / receive data!");
84 mLastSample = node::sample_alloc(&mSamplePool);
85 mLastSample->signals = mNode->getInputSignals(
false);
86 mLastSample->sequence = 0;
87 mLastSample->ts.origin.tv_sec = 0;
88 mLastSample->ts.origin.tv_nsec = 0;
90 std::memset(&mLastSample->data, 0, mLastSample->capacity *
sizeof(
float));
93 void InterfaceWorkerVillas::prepareNode() {
94 int ret = node::pool_init(&mSamplePool, mQueueLength,
95 sizeof(Sample) + SAMPLE_DATA_LENGTH(mSampleLength));
97 SPDLOG_LOGGER_ERROR(mLog,
98 "Error: InterfaceVillas failed to init sample pool. "
99 "pool_init returned code {}",
104 ret = mNode->prepare();
106 SPDLOG_LOGGER_ERROR(mLog,
107 "Error: Node in InterfaceVillas failed to prepare. "
108 "Prepare returned code {}",
112 SPDLOG_LOGGER_INFO(mLog,
"Node: {}", mNode->getNameFull());
114 mNode->getFactory()->start(
117 ret = mNode->start();
119 SPDLOG_LOGGER_ERROR(mLog,
120 "Fatal error: failed to start node in InterfaceVillas. "
121 "Start returned code {}",
128 void InterfaceWorkerVillas::setupNodeSignals() {
129 mNode->out.path =
new node::Path();
130 mNode->out.path->signals = std::make_shared<node::SignalList>();
131 node::SignalList::Ptr nodeOutputSignals =
132 mNode->out.path->getOutputSignals(
false);
133 nodeOutputSignals->clear();
135 for (
const auto &[
id, signal] : mExportSignals) {
137 nodeOutputSignals->push_back(
138 std::make_shared<node::Signal>(
"",
"", node::SignalType::INVALID));
141 nodeOutputSignals->push_back(signal);
145 node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(
true);
146 if (nodeInputSignals ==
nullptr) {
147 nodeInputSignals = std::make_shared<node::SignalList>();
149 nodeInputSignals->clear();
152 for (
const auto &[
id, signal] : mImportSignals) {
154 nodeInputSignals->push_back(
155 std::make_shared<node::Signal>(
"",
"", node::SignalType::INVALID));
158 nodeInputSignals->push_back(signal);
164 SPDLOG_LOGGER_INFO(mLog,
"Closing InterfaceVillas...");
165 int ret = mNode->stop();
169 "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
174 ret = node::pool_destroy(&mSamplePool);
176 SPDLOG_LOGGER_ERROR(mLog,
177 "Error: failed to destroy SamplePool in "
178 "InterfaceVillas. pool_destroy returned code {}",
183 mNode->getFactory()->stop();
189 std::vector<Interface::AttributePacket> &updatedAttrs) {
190 Sample *sample =
nullptr;
192 bool shouldRead =
false;
194 auto pollFds = mNode->getPollFDs();
196 if (!pollFds.empty()) {
197 auto pfds = std::vector<struct pollfd>();
199 for (
auto pollFd : pollFds) {
200 pfds.push_back(pollfd{.fd = pollFd, .events = POLLIN});
203 ret = ::poll(pfds.data(), pfds.size(), 0);
206 SPDLOG_LOGGER_ERROR(mLog,
207 "Fatal error: failed to read sample from "
208 "InterfaceVillas. Poll returned code {}",
218 for (
const auto &pfd : pfds) {
219 if (pfd.revents & POLLIN) {
230 sample = node::sample_alloc(&mSamplePool);
232 ret = mNode->read(&sample, 1);
234 SPDLOG_LOGGER_ERROR(mLog,
235 "Fatal error: failed to read sample from "
236 "InterfaceVillas. Read returned code {}",
243 for (UInt i = 0; i < mImports.size(); i++) {
244 auto importedAttr = std::get<0>(mImports[i])(sample);
245 if (!importedAttr.isNull()) {
246 updatedAttrs.emplace_back(Interface::AttributePacket{
247 importedAttr, i, mCurrentSequenceInterfaceToDpsim,
248 Interface::AttributePacketFlags::PACKET_NO_FLAGS});
249 mCurrentSequenceInterfaceToDpsim++;
253 if (!pollFds.empty()) {
257 ret = (int)::read(pollFds[0], &result, 8);
260 mLog,
"Could not reset poll file descriptor! Read returned {}",
265 ret = (int)::write(pollFds[0], (
void *)&result, 8);
269 "Could not decrement poll file descriptor! Write returned {}",
276 sample_decref(sample);
278 }
catch (
const std::exception &) {
280 sample_decref(sample);
287 std::vector<Interface::AttributePacket> &updatedAttrs) {
289 for (
const auto &packet : updatedAttrs) {
290 if (std::get<1>(mExports[packet.attributeId]) < packet.sequenceId) {
291 std::get<1>(mExports[packet.attributeId]) = packet.sequenceId;
296 auto beginOutdated = std::remove_if(
297 updatedAttrs.begin(), updatedAttrs.end(), [
this](
auto packet) {
298 return std::get<1>(mExports[packet.attributeId]) > packet.sequenceId;
300 updatedAttrs.erase(beginOutdated, updatedAttrs.end());
302 Sample *sample =
nullptr;
305 bool sampleFilled =
false;
307 sample = node::sample_alloc(&mSamplePool);
308 if (sample ==
nullptr) {
309 SPDLOG_LOGGER_ERROR(mLog,
"InterfaceVillas could not allocate a new "
310 "sample! Not sending any data!");
314 sample->signals = mNode->getOutputSignals(
false);
315 auto beginExported = std::remove_if(
316 updatedAttrs.begin(), updatedAttrs.end(),
317 [
this, &sampleFilled, &sample](
auto packet) {
318 if (!std::get<2>(mExports[packet.attributeId])) {
320 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
326 updatedAttrs.erase(beginExported, updatedAttrs.end());
329 if (((
long)updatedAttrs.size()) ==
330 std::count_if(mExports.cbegin(), mExports.cend(),
331 [](
auto x) { return std::get<2>(x); })) {
332 for (
const auto &packet : updatedAttrs) {
333 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
336 updatedAttrs.clear();
340 sample->sequence = mSequence++;
341 sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
342 sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
343 clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
344 sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
348 ret = mNode->write(&sample, 1);
351 SPDLOG_LOGGER_ERROR(mLog,
352 "Failed to write samples to InterfaceVillas. Write "
356 sample_copy(mLastSample, sample);
358 sample_decref(sample);
359 }
catch (
const std::exception &) {
365 sample = mLastSample;
368 ret = mNode->write(&sample, 1);
370 sample_decref(sample);
375 "Failed to write samples to InterfaceVillas. Write returned code {}",
382 void InterfaceWorkerVillas::initVillas()
const {
383 if (
int ret = node::memory::init(villasHugePages); ret)
384 throw RuntimeError(
"Error: VillasNode failed to initialize memory system");
386 villas::kernel::rt::init(villasPriority, villasAffinity);
389 void InterfaceWorkerVillas::configureExport(UInt attributeId,
390 const std::type_info &type,
391 UInt idx, Bool waitForOnWrite,
393 const String &unit) {
395 if (mLog !=
nullptr) {
396 SPDLOG_LOGGER_WARN(mLog,
"InterfaceVillas has already been opened! "
397 "Configuration will remain unchanged.");
401 if (attributeId != mExports.size()) {
402 if (mLog !=
nullptr) {
404 mLog,
"The exports already configured do not match with the given "
405 "attribute ID! Configuration will remain unchanged.");
410 if (type ==
typeid(Int)) {
411 mExports.emplace_back(
413 if (idx >= smp->capacity)
414 throw std::out_of_range(
"not enough space in allocated sample");
415 if (idx >= smp->length)
416 smp->length = idx + 1;
421 if (attrTyped.isNull())
424 smp->data[idx].i = **attrTyped;
427 mExportSignals[idx] =
428 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
429 }
else if (type ==
typeid(Real)) {
430 mExports.emplace_back(
432 if (idx >= smp->capacity)
433 throw std::out_of_range(
"not enough space in allocated sample");
434 if (idx >= smp->length)
435 smp->length = idx + 1;
440 if (attrTyped.isNull())
443 smp->data[idx].f = **attrTyped;
446 mExportSignals[idx] =
447 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
448 }
else if (type ==
typeid(Complex)) {
449 mExports.emplace_back(
451 if (idx >= smp->capacity)
452 throw std::out_of_range(
"not enough space in allocated sample");
453 if (idx >= smp->length)
454 smp->length = idx + 1;
459 if (attrTyped.isNull())
462 smp->data[idx].z = **attrTyped;
465 mExportSignals[idx] =
466 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
467 }
else if (type ==
typeid(Bool)) {
468 mExports.emplace_back(
470 if (idx >= smp->capacity)
471 throw std::out_of_range(
"not enough space in allocated sample");
472 if (idx >= smp->length)
473 smp->length = idx + 1;
478 if (attrTyped.isNull())
481 smp->data[idx].b = **attrTyped;
484 mExportSignals[idx] =
485 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
487 if (mLog !=
nullptr) {
488 SPDLOG_LOGGER_WARN(mLog,
"Unsupported attribute type! Interface "
489 "configuration will remain unchanged!");
494 void InterfaceWorkerVillas::configureImport(UInt attributeId,
495 const std::type_info &type,
496 UInt idx,
const String &name,
497 const String &unit) {
499 if (mLog !=
nullptr) {
500 SPDLOG_LOGGER_WARN(mLog,
"InterfaceVillas has already been opened! "
501 "Configuration will remain unchanged.");
505 if (attributeId != mImports.size()) {
506 if (mLog !=
nullptr) {
508 mLog,
"The imports already configured do not match with the given "
509 "attribute ID! Configuration will remain unchanged.");
513 const auto &log = mLog;
515 if (type ==
typeid(Int)) {
516 mImports.emplace_back(
518 if (idx >= smp->length) {
519 log->error(
"incomplete data received from InterfaceVillas");
526 mImportSignals[idx] =
527 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
528 }
else if (type ==
typeid(Real)) {
529 mImports.emplace_back(
531 if (idx >= smp->length) {
532 log->error(
"incomplete data received from InterfaceVillas");
539 mImportSignals[idx] =
540 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
541 }
else if (type ==
typeid(Complex)) {
542 mImports.emplace_back(
544 if (idx >= smp->length) {
545 log->error(
"incomplete data received from InterfaceVillas");
552 mImportSignals[idx] =
553 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
554 }
else if (type ==
typeid(Bool)) {
555 mImports.emplace_back(
557 if (idx >= smp->length) {
558 log->error(
"incomplete data received from InterfaceVillas");
565 mImportSignals[idx] =
566 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
568 if (mLog !=
nullptr) {
569 SPDLOG_LOGGER_WARN(mLog,
"Unsupported attribute type! Interface "
570 "configuration will remain unchanged!");
575 void InterfaceWorkerVillas::printSignals()
const {
578 "InterfaceWorkerVillas Settings: Queue Length: {}, Sample Length: {}",
579 mQueueLength, mSampleLength);
580 SPDLOG_LOGGER_INFO(mLog,
"Export signals:");
581 for (
const auto &[
id, signal] : mExportSignals) {
582 SPDLOG_LOGGER_INFO(mLog,
"ID: {}, Name: {}, Unit: {}, Type: {}",
id,
583 signal->name, signal->unit,
584 node::signalTypeToString(signal->type));
587 SPDLOG_LOGGER_INFO(mLog,
"Import signals:");
588 for (
const auto &[
id, signal] : mImportSignals) {
589 SPDLOG_LOGGER_INFO(mLog,
"ID: {}, Name: {}, Unit: {}, Type: {}",
id,
590 signal->name, signal->unit,
591 node::signalTypeToString(signal->type));
void writeValuesToEnv(std::vector< Interface::AttributePacket > &updatedAttrs) override
void readValuesFromEnv(std::vector< Interface::AttributePacket > &updatedAttrs) override