12 #include <spdlog/sinks/stdout_color_sinks.h>
14 #include <dpsim-models/Logger.h>
15 #include <dpsim-villas/InterfaceWorkerVillas.h>
16 #include <villas/path.hpp>
17 #include <villas/signal_list.hpp>
20 using namespace DPsim;
22 Bool InterfaceWorkerVillas::villasInitialized =
false;
23 UInt InterfaceWorkerVillas::villasAffinity = 0;
24 UInt InterfaceWorkerVillas::villasPriority = 0;
25 UInt InterfaceWorkerVillas::villasHugePages = 100;
27 InterfaceWorkerVillas::InterfaceWorkerVillas(
const String &nodeConfig,
30 spdlog::level::level_enum logLevel)
31 :
InterfaceWorker(), mNodeConfig(nodeConfig), mQueueLength(queueLength),
32 mSampleLength(sampleLength) {
33 mLog = CPS::Logger::get(
"InterfaceWorkerVillas", logLevel, logLevel);
36 void InterfaceWorkerVillas::open() {
37 SPDLOG_LOGGER_INFO(mLog,
"Opening InterfaceWorkerVillas...");
38 Log::getInstance().setLevel(mLog->level());
40 if (!InterfaceWorkerVillas::villasInitialized) {
41 SPDLOG_LOGGER_INFO(mLog,
"Initializing Villas...");
43 InterfaceWorkerVillas::villasInitialized =
true;
47 json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
48 if (config ==
nullptr) {
49 SPDLOG_LOGGER_ERROR(mLog,
"Error: Failed to parse node config! Error: {}",
51 throw JsonError(config, error);
54 const json_t *nodeType = json_object_get(config,
"type");
55 if (nodeType ==
nullptr) {
56 SPDLOG_LOGGER_ERROR(mLog,
"Error: Node config does not contain type-key!");
59 String nodeTypeString = json_string_value(nodeType);
61 mNode = node::NodeFactory::make(nodeTypeString);
64 ret = mNode->parse(config);
66 SPDLOG_LOGGER_ERROR(mLog,
67 "Error: Node in InterfaceVillas failed to parse "
68 "config. Parse returned code {}",
76 "Error: Node in InterfaceVillas failed check. Check returned code {}",
81 SPDLOG_LOGGER_INFO(mLog,
"Preparing VILLASNode instance...");
84 SPDLOG_LOGGER_INFO(mLog,
"Node is ready to send / receive data!");
88 mLastSample = node::sample_alloc(&mSamplePool);
89 mLastSample->signals = mNode->getInputSignals(
false);
90 mLastSample->sequence = 0;
91 mLastSample->ts.origin.tv_sec = 0;
92 mLastSample->ts.origin.tv_nsec = 0;
94 std::memset(&mLastSample->data, 0, mLastSample->capacity *
sizeof(
float));
97 void InterfaceWorkerVillas::prepareNode() {
98 int ret = node::pool_init(&mSamplePool, mQueueLength,
99 sizeof(Sample) + SAMPLE_DATA_LENGTH(mSampleLength));
101 SPDLOG_LOGGER_ERROR(mLog,
102 "Error: InterfaceVillas failed to init sample pool. "
103 "pool_init returned code {}",
108 ret = mNode->prepare();
110 SPDLOG_LOGGER_ERROR(mLog,
111 "Error: Node in InterfaceVillas failed to prepare. "
112 "Prepare returned code {}",
116 SPDLOG_LOGGER_INFO(mLog,
"Node: {}", mNode->getNameFull());
118 mNode->getFactory()->start(
121 ret = mNode->start();
123 SPDLOG_LOGGER_ERROR(mLog,
124 "Fatal error: failed to start node in InterfaceVillas. "
125 "Start returned code {}",
132 void InterfaceWorkerVillas::setupNodeSignals() {
133 mNode->out.path =
new node::Path();
134 mNode->out.path->signals = std::make_shared<node::SignalList>();
135 node::SignalList::Ptr nodeOutputSignals =
136 mNode->out.path->getOutputSignals(
false);
137 nodeOutputSignals->clear();
139 for (
const auto &[
id, signal] : mExportSignals) {
141 nodeOutputSignals->push_back(
142 std::make_shared<node::Signal>(
"",
"", node::SignalType::INVALID));
145 nodeOutputSignals->push_back(signal);
149 node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(
true);
150 if (nodeInputSignals ==
nullptr) {
151 nodeInputSignals = std::make_shared<node::SignalList>();
153 nodeInputSignals->clear();
156 for (
const auto &[
id, signal] : mImportSignals) {
158 nodeInputSignals->push_back(
159 std::make_shared<node::Signal>(
"",
"", node::SignalType::INVALID));
162 nodeInputSignals->push_back(signal);
168 SPDLOG_LOGGER_INFO(mLog,
"Closing InterfaceVillas...");
169 int ret = mNode->stop();
173 "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
178 ret = node::pool_destroy(&mSamplePool);
180 SPDLOG_LOGGER_ERROR(mLog,
181 "Error: failed to destroy SamplePool in "
182 "InterfaceVillas. pool_destroy returned code {}",
187 mNode->getFactory()->stop();
193 std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
194 Sample *sample =
nullptr;
196 bool shouldRead =
false;
198 auto pollFds = mNode->getPollFDs();
200 if (!pollFds.empty()) {
201 auto pfds = std::vector<struct pollfd>();
203 for (
auto pollFd : pollFds) {
204 pfds.push_back(pollfd{.fd = pollFd, .events = POLLIN});
207 ret = ::poll(pfds.data(), pfds.size(), 0);
210 SPDLOG_LOGGER_ERROR(mLog,
211 "Fatal error: failed to read sample from "
212 "InterfaceVillas. Poll returned code {}",
222 for (
const auto &pfd : pfds) {
223 if (pfd.revents & POLLIN) {
234 sample = node::sample_alloc(&mSamplePool);
236 ret = mNode->read(&sample, 1);
238 SPDLOG_LOGGER_ERROR(mLog,
239 "Fatal error: failed to read sample from "
240 "InterfaceVillas. Read returned code {}",
247 for (UInt i = 0; i < mImports.size(); i++) {
248 auto importedAttr = std::get<0>(mImports[i])(sample);
249 if (!importedAttr.isNull()) {
250 updatedAttrs.emplace_back(InterfaceQueued::AttributePacket{
251 importedAttr, i, mCurrentSequenceInterfaceToDpsim,
252 InterfaceQueued::AttributePacketFlags::PACKET_NO_FLAGS});
253 mCurrentSequenceInterfaceToDpsim++;
257 if (!pollFds.empty()) {
261 ret = (int)::read(pollFds[0], &result, 8);
264 mLog,
"Could not reset poll file descriptor! Read returned {}",
269 ret = (int)::write(pollFds[0], (
void *)&result, 8);
273 "Could not decrement poll file descriptor! Write returned {}",
280 sample_decref(sample);
282 }
catch (
const std::exception &) {
284 sample_decref(sample);
291 std::vector<InterfaceQueued::AttributePacket> &updatedAttrs) {
293 for (
const auto &packet : updatedAttrs) {
294 if (std::get<1>(mExports[packet.attributeId]) < packet.sequenceId) {
295 std::get<1>(mExports[packet.attributeId]) = packet.sequenceId;
300 auto beginOutdated = std::remove_if(
301 updatedAttrs.begin(), updatedAttrs.end(), [
this](
auto packet) {
302 return std::get<1>(mExports[packet.attributeId]) > packet.sequenceId;
304 updatedAttrs.erase(beginOutdated, updatedAttrs.end());
306 Sample *sample =
nullptr;
309 bool sampleFilled =
false;
311 sample = node::sample_alloc(&mSamplePool);
312 if (sample ==
nullptr) {
313 SPDLOG_LOGGER_ERROR(mLog,
"InterfaceVillas could not allocate a new "
314 "sample! Not sending any data!");
318 sample->signals = mNode->getOutputSignals(
false);
319 auto beginExported = std::remove_if(
320 updatedAttrs.begin(), updatedAttrs.end(),
321 [
this, &sampleFilled, &sample](
auto packet) {
322 if (!std::get<2>(mExports[packet.attributeId])) {
324 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
330 updatedAttrs.erase(beginExported, updatedAttrs.end());
333 if (((
long)updatedAttrs.size()) ==
334 std::count_if(mExports.cbegin(), mExports.cend(),
335 [](
auto x) { return std::get<2>(x); })) {
336 for (
const auto &packet : updatedAttrs) {
337 std::get<0>(mExports[packet.attributeId])(packet.value, sample);
340 updatedAttrs.clear();
344 sample->sequence = mSequence++;
345 sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
346 sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
347 clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
348 sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
352 ret = mNode->write(&sample, 1);
355 SPDLOG_LOGGER_ERROR(mLog,
356 "Failed to write samples to InterfaceVillas. Write "
360 sample_copy(mLastSample, sample);
362 sample_decref(sample);
363 }
catch (
const std::exception &) {
369 sample = mLastSample;
372 ret = mNode->write(&sample, 1);
374 sample_decref(sample);
379 "Failed to write samples to InterfaceVillas. Write returned code {}",
386 void InterfaceWorkerVillas::initVillas()
const {
387 if (
int ret = node::memory::init(villasHugePages); ret)
388 throw RuntimeError(
"Error: VillasNode failed to initialize memory system");
390 villas::kernel::rt::init(villasPriority, villasAffinity);
393 void InterfaceWorkerVillas::configureExport(UInt attributeId,
394 const std::type_info &type,
395 UInt idx, Bool waitForOnWrite,
397 const String &unit) {
399 if (mLog !=
nullptr) {
400 SPDLOG_LOGGER_WARN(mLog,
"InterfaceVillas has already been opened! "
401 "Configuration will remain unchanged.");
405 if (attributeId != mExports.size()) {
406 if (mLog !=
nullptr) {
408 mLog,
"The exports already configured do not match with the given "
409 "attribute ID! Configuration will remain unchanged.");
414 if (type ==
typeid(Int)) {
415 mExports.emplace_back(
417 if (idx >= smp->capacity)
418 throw std::out_of_range(
"not enough space in allocated sample");
419 if (idx >= smp->length)
420 smp->length = idx + 1;
425 if (attrTyped.isNull())
428 smp->data[idx].i = **attrTyped;
431 mExportSignals[idx] =
432 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
433 }
else if (type ==
typeid(Real)) {
434 mExports.emplace_back(
436 if (idx >= smp->capacity)
437 throw std::out_of_range(
"not enough space in allocated sample");
438 if (idx >= smp->length)
439 smp->length = idx + 1;
444 if (attrTyped.isNull())
447 smp->data[idx].f = **attrTyped;
450 mExportSignals[idx] =
451 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
452 }
else if (type ==
typeid(Complex)) {
453 mExports.emplace_back(
455 if (idx >= smp->capacity)
456 throw std::out_of_range(
"not enough space in allocated sample");
457 if (idx >= smp->length)
458 smp->length = idx + 1;
463 if (attrTyped.isNull())
466 smp->data[idx].z = **attrTyped;
469 mExportSignals[idx] =
470 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
471 }
else if (type ==
typeid(Bool)) {
472 mExports.emplace_back(
474 if (idx >= smp->capacity)
475 throw std::out_of_range(
"not enough space in allocated sample");
476 if (idx >= smp->length)
477 smp->length = idx + 1;
482 if (attrTyped.isNull())
485 smp->data[idx].b = **attrTyped;
488 mExportSignals[idx] =
489 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
491 if (mLog !=
nullptr) {
492 SPDLOG_LOGGER_WARN(mLog,
"Unsupported attribute type! Interface "
493 "configuration will remain unchanged!");
498 void InterfaceWorkerVillas::configureImport(UInt attributeId,
499 const std::type_info &type,
500 UInt idx,
const String &name,
501 const String &unit) {
503 if (mLog !=
nullptr) {
504 SPDLOG_LOGGER_WARN(mLog,
"InterfaceVillas has already been opened! "
505 "Configuration will remain unchanged.");
509 if (attributeId != mImports.size()) {
510 if (mLog !=
nullptr) {
512 mLog,
"The imports already configured do not match with the given "
513 "attribute ID! Configuration will remain unchanged.");
517 const auto &log = mLog;
519 if (type ==
typeid(Int)) {
520 mImports.emplace_back(
522 if (idx >= smp->length) {
523 log->error(
"incomplete data received from InterfaceVillas");
530 mImportSignals[idx] =
531 std::make_shared<node::Signal>(name, unit, node::SignalType::INTEGER);
532 }
else if (type ==
typeid(Real)) {
533 mImports.emplace_back(
535 if (idx >= smp->length) {
536 log->error(
"incomplete data received from InterfaceVillas");
543 mImportSignals[idx] =
544 std::make_shared<node::Signal>(name, unit, node::SignalType::FLOAT);
545 }
else if (type ==
typeid(Complex)) {
546 mImports.emplace_back(
548 if (idx >= smp->length) {
549 log->error(
"incomplete data received from InterfaceVillas");
556 mImportSignals[idx] =
557 std::make_shared<node::Signal>(name, unit, node::SignalType::COMPLEX);
558 }
else if (type ==
typeid(Bool)) {
559 mImports.emplace_back(
561 if (idx >= smp->length) {
562 log->error(
"incomplete data received from InterfaceVillas");
569 mImportSignals[idx] =
570 std::make_shared<node::Signal>(name, unit, node::SignalType::BOOLEAN);
572 if (mLog !=
nullptr) {
573 SPDLOG_LOGGER_WARN(mLog,
"Unsupported attribute type! Interface "
574 "configuration will remain unchanged!");
579 void InterfaceWorkerVillas::printSignals()
const {
582 "InterfaceWorkerVillas Settings: Queue Length: {}, Sample Length: {}",
583 mQueueLength, mSampleLength);
584 SPDLOG_LOGGER_INFO(mLog,
"Export signals:");
585 for (
const auto &[
id, signal] : mExportSignals) {
586 SPDLOG_LOGGER_INFO(mLog,
"ID: {}, Name: {}, Unit: {}, Type: {}",
id,
587 signal->name, signal->unit,
588 node::signalTypeToString(signal->type));
591 SPDLOG_LOGGER_INFO(mLog,
"Import signals:");
592 for (
const auto &[
id, signal] : mImportSignals) {
593 SPDLOG_LOGGER_INFO(mLog,
"ID: {}, Name: {}, Unit: {}, Type: {}",
id,
594 signal->name, signal->unit,
595 node::signalTypeToString(signal->type));
void writeValuesToEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override
void readValuesFromEnv(std::vector< InterfaceQueued::AttributePacket > &updatedAttrs) override