DPsim
Loading...
Searching...
No Matches
InterfaceVillasQueueless.cpp
1/* Author: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
2 * SPDX-FileCopyrightText: 2023-2024 Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
3 * SPDX-License-Identifier: MPL-2.0
4 */
5
6#include <dpsim-villas/InterfaceVillasQueueless.h>
7#include <dpsim-villas/InterfaceWorkerVillas.h>
8#include <memory>
9#include <spdlog/spdlog.h>
10#include <typeinfo>
11#include <villas/node.h>
12#include <villas/node/memory.hpp>
13#include <villas/node/memory_type.hpp>
14#include <villas/path.hpp>
15#include <villas/signal_type.hpp>
16
17using namespace CPS;
18using namespace villas;
19
20namespace DPsim {
21
23 const String &nodeConfig, const String &name,
24 spdlog::level::level_enum logLevel)
25 : Interface(name, logLevel), mNodeConfig(nodeConfig), mNode(nullptr),
26 mSamplePool(), mSequenceToDpsim(0), mSequenceFromDpsim(0) {}
27
28void InterfaceVillasQueueless::createNode() {
29 if (villas::node::memory::init(100) != 0) {
30 SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to initialize memory subsystem!");
31 std::exit(1);
32 }
33 json_error_t error;
34 json_t *config = json_loads(mNodeConfig.c_str(), 0, &error);
35 if (config == nullptr) {
36 SPDLOG_LOGGER_ERROR(mLog, "Error: Failed to parse node config! Error: {}",
37 error.text);
38 throw JsonError(config, error);
39 }
40
41 const json_t *nodeType = json_object_get(config, "type");
42 if (nodeType == nullptr) {
43 SPDLOG_LOGGER_ERROR(mLog, "Error: Node config does not contain type-key!");
44 std::exit(1);
45 }
46 String nodeTypeString = json_string_value(nodeType);
47
48 mNode = node::NodeFactory::make(nodeTypeString);
49
50 int ret = 0;
51 ret = mNode->parse(config);
52 if (ret < 0) {
53 SPDLOG_LOGGER_ERROR(mLog,
54 "Error: Node in InterfaceVillas failed to parse "
55 "config. Parse returned code {}",
56 ret);
57 std::exit(1);
58 }
59 ret = mNode->check();
60 if (ret < 0) {
61 SPDLOG_LOGGER_ERROR(
62 mLog,
63 "Error: Node in InterfaceVillas failed check. Check returned code {}",
64 ret);
65 std::exit(1);
66 }
67 struct villas::node::memory::Type *pool_mt = &villas::node::memory::heap;
68 ret = node::pool_init(&mSamplePool, 16,
69 sizeof(node::Sample) + SAMPLE_DATA_LENGTH(64), pool_mt);
70 if (ret < 0) {
71 SPDLOG_LOGGER_ERROR(mLog,
72 "Error: InterfaceVillas failed to init sample pool. "
73 "pool_init returned code {}",
74 ret);
75 std::exit(1);
76 }
77
78 ret = mNode->prepare();
79 if (ret < 0) {
80 SPDLOG_LOGGER_ERROR(mLog,
81 "Error: Node in InterfaceVillas failed to prepare. "
82 "Prepare returned code {}",
83 ret);
84 std::exit(1);
85 }
86 SPDLOG_LOGGER_INFO(mLog, "Node: {}", mNode->getNameFull());
87}
88
89static node::SignalType stdTypeToNodeType(const std::type_info &type) {
90 if (type == typeid(Real)) {
91 return node::SignalType::FLOAT;
92 } else if (type == typeid(Int)) {
93 return node::SignalType::INTEGER;
94 } else if (type == typeid(Bool)) {
95 return node::SignalType::BOOLEAN;
96 } else if (type == typeid(Complex)) {
97 return node::SignalType::COMPLEX;
98 } else {
99 return node::SignalType::INVALID;
100 }
101}
102
103void InterfaceVillasQueueless::createSignals() {
104 mNode->out.path = new node::Path();
105 mNode->out.path->signals = std::make_shared<node::SignalList>();
106 node::SignalList::Ptr nodeOutputSignals =
107 mNode->out.path->getOutputSignals(false);
108 nodeOutputSignals->clear();
109 unsigned int idx = 0;
110 for (const auto &[attr, id] : mExportAttrsDpsim) {
111 while (id > idx) {
112 nodeOutputSignals->push_back(
113 std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
114 idx++;
115 }
116 nodeOutputSignals->push_back(std::make_shared<node::Signal>(
117 "", "", stdTypeToNodeType(attr->getType())));
118 }
119
120 node::SignalList::Ptr nodeInputSignals = mNode->getInputSignals(true);
121 if (nodeInputSignals == nullptr) {
122 nodeInputSignals = std::make_shared<node::SignalList>();
123 } else {
124 nodeInputSignals->clear();
125 }
126 idx = 0;
127 for (const auto &[attr, id, blockOnRead, syncOnSimulationStart] :
128 mImportAttrsDpsim) {
129 while (id > idx) {
130 nodeInputSignals->push_back(
131 std::make_shared<node::Signal>("", "", node::SignalType::INVALID));
132 idx++;
133 }
134 nodeInputSignals->push_back(std::make_shared<node::Signal>(
135 "", "", stdTypeToNodeType(attr->getType())));
136 idx++;
137 }
138}
139
140void InterfaceVillasQueueless::open() {
141 createNode();
142 createSignals();
143
144 // We have no SuperNode, so just hope type_start doesn't use it...
145 mNode->getFactory()->start(nullptr);
146
147 auto ret = mNode->start();
148 if (ret < 0) {
149 SPDLOG_LOGGER_ERROR(mLog,
150 "Fatal error: failed to start node in InterfaceVillas. "
151 "Start returned code {}",
152 ret);
153 close();
154 std::exit(1);
155 }
156 mOpened = true;
157 mSequenceFromDpsim = 0;
158 mSequenceToDpsim = 0;
159}
160
161void InterfaceVillasQueueless::close() {
162 SPDLOG_LOGGER_INFO(mLog, "Closing InterfaceVillas...");
163 int ret = mNode->stop();
164 if (ret < 0) {
165 SPDLOG_LOGGER_ERROR(
166 mLog,
167 "Error: failed to stop node in InterfaceVillas. Stop returned code {}",
168 ret);
169 std::exit(1);
170 }
171 mOpened = false;
172 ret = node::pool_destroy(&mSamplePool);
173 if (ret < 0) {
174 SPDLOG_LOGGER_ERROR(mLog,
175 "Error: failed to destroy SamplePool in "
176 "InterfaceVillas. pool_destroy returned code {}",
177 ret);
178 std::exit(1);
179 }
180
181 mNode->getFactory()->stop();
182
183 delete mNode;
184 mOpened = false;
185}
186
187CPS::Task::List InterfaceVillasQueueless::getTasks() {
188 auto tasks = CPS::Task::List();
189 if (!mImportAttrsDpsim.empty()) {
190 tasks.push_back(std::make_shared<InterfaceVillasQueueless::PreStep>(*this));
191 }
192 if (!mExportAttrsDpsim.empty()) {
193 tasks.push_back(
194 std::make_shared<InterfaceVillasQueueless::PostStep>(*this));
195 }
196 return tasks;
197}
198
199void InterfaceVillasQueueless::PreStep::execute(Real time, Int timeStepCount) {
200 auto seqnum = mIntf.readFromVillas();
201 static size_t overrunCounter = 0;
202 if (seqnum != mIntf.mSequenceToDpsim + 1) {
203 overrunCounter++;
204 }
205 if (overrunCounter > 10000) {
206 SPDLOG_LOGGER_WARN(mIntf.mLog, "{} Overrun(s) detected!", overrunCounter);
207 overrunCounter = 0;
208 }
209 mIntf.mSequenceToDpsim = seqnum;
210}
211
212Int InterfaceVillasQueueless::readFromVillas() {
213 node::Sample *sample = nullptr;
214 Int seqnum = 0;
215 int ret = 0;
216 if (mImportAttrsDpsim.size() == 0) {
217 return 0;
218 }
219 try {
220 sample = node::sample_alloc(&mSamplePool);
221 ret = 0;
222 while (ret == 0) {
223 ret = mNode->read(&sample, 1);
224 if (ret < 0) {
225 SPDLOG_LOGGER_ERROR(mLog,
226 "Fatal error: failed to read sample from "
227 "InterfaceVillas. Read returned code {}",
228 ret);
229 close();
230 std::exit(1);
231 } else if (ret == 0) {
232 SPDLOG_LOGGER_WARN(mLog,
233 "InterfaceVillas read returned 0. Retrying...");
234 }
235 }
236
237 if (sample->length != mImportAttrsDpsim.size()) {
238 SPDLOG_LOGGER_ERROR(mLog,
239 "Error: Received Sample length ({}) does not match "
240 "configured attributes length ({})",
241 sample->length, mImportAttrsDpsim.size());
242 throw RuntimeError(
243 "Received Sample length does not match configured attributes length");
244 }
245
246 for (size_t i = 0; i < mImportAttrsDpsim.size(); i++) {
247 auto attr = std::get<0>(mImportAttrsDpsim[i]);
248 if (attr->getType() == typeid(Real)) {
249 auto attrReal =
250 std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
251 attrReal->set(sample->data[i].f);
252 } else if (attr->getType() == typeid(Int)) {
253 auto attrInt = std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
254 attrInt->set(sample->data[i].i);
255 if (i == 0) {
256 seqnum = sample->data[i].i;
257 }
258 } else if (attr->getType() == typeid(Bool)) {
259 auto attrBool =
260 std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
261 attrBool->set(sample->data[i].b);
262 } else if (attr->getType() == typeid(Complex)) {
263 auto attrComplex =
264 std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
265 attrComplex->set(
266 Complex(sample->data[i].z.real(), sample->data[i].z.imag()));
267 } else {
268 SPDLOG_LOGGER_ERROR(mLog, "Error: Unsupported attribute type!");
269 throw RuntimeError("Unsupported attribute type!");
270 }
271 }
272
273 sample_decref(sample);
274 } catch (const std::exception &) {
275 if (sample)
276 sample_decref(sample);
277
278 throw;
279 }
280 return seqnum;
281}
282
283void InterfaceVillasQueueless::PostStep::execute(Real time, Int timeStepCount) {
284 mIntf.writeToVillas();
285}
286
287void InterfaceVillasQueueless::writeToVillas() {
288 if (mExportAttrsDpsim.size() == 0) {
289 return;
290 }
291 node::Sample *sample = nullptr;
292 Int ret = 0;
293 try {
294 sample = node::sample_alloc(&mSamplePool);
295 if (sample == nullptr) {
296 SPDLOG_LOGGER_ERROR(mLog, "InterfaceVillas could not allocate a new "
297 "sample! Not sending any data!");
298 return;
299 }
300
301 sample->signals = mNode->getOutputSignals(false);
302
303 for (size_t i = 0; i < mExportAttrsDpsim.size(); i++) {
304 auto attr = std::get<0>(mExportAttrsDpsim[i]);
305 if (attr->getType() == typeid(Real)) {
306 auto attrReal =
307 std::dynamic_pointer_cast<Attribute<Real>>(attr.getPtr());
308 sample->data[i].f = attrReal->get();
309 } else if (attr->getType() == typeid(Int)) {
310 auto attrInt = std::dynamic_pointer_cast<Attribute<Int>>(attr.getPtr());
311 sample->data[i].i = attrInt->get();
312 } else if (attr->getType() == typeid(Bool)) {
313 auto attrBool =
314 std::dynamic_pointer_cast<Attribute<Bool>>(attr.getPtr());
315 sample->data[i].b = attrBool->get();
316 } else if (attr->getType() == typeid(Complex)) {
317 auto attrComplex =
318 std::dynamic_pointer_cast<Attribute<Complex>>(attr.getPtr());
319 sample->data[i].z = std::complex<float>(attrComplex->get().real(),
320 attrComplex->get().imag());
321 } else {
322 SPDLOG_LOGGER_ERROR(mLog, "Error: Unsupported attribute type!");
323 throw RuntimeError("Unsupported attribute type!");
324 }
325 }
326
327 sample->length = mExportAttrsDpsim.size();
328 sample->sequence = mSequenceFromDpsim++;
329 sample->flags |= (int)villas::node::SampleFlags::HAS_SEQUENCE;
330 sample->flags |= (int)villas::node::SampleFlags::HAS_DATA;
331 clock_gettime(CLOCK_REALTIME, &sample->ts.origin);
332 sample->flags |= (int)villas::node::SampleFlags::HAS_TS_ORIGIN;
333
334 do {
335 ret = mNode->write(&sample, 1);
336 } while (ret == 0);
337 if (ret < 0)
338 SPDLOG_LOGGER_ERROR(mLog,
339 "Failed to write samples to InterfaceVillas. Write "
340 "returned code {}",
341 ret);
342
343 sample_decref(sample);
344 } catch (const std::exception &) {
345 sample_decref(sample);
346
347 if (ret < 0)
348 SPDLOG_LOGGER_ERROR(
349 mLog,
350 "Failed to write samples to InterfaceVillas. Write returned code {}",
351 ret);
352
353 // Don't throw here, because we managed to send something
354 }
355}
356
358 // Block on read until all attributes with syncOnSimulationStart are read
359 mSequenceToDpsim = this->readFromVillas();
360}
361
362void InterfaceVillasQueueless::syncExports() {
363 // Just push all the attributes
364 this->writeToVillas();
365}
366
367void InterfaceVillasQueueless::printVillasSignals() const {
368 SPDLOG_LOGGER_INFO(mLog, "Export signals:");
369 for (const auto &signal : *mNode->getOutputSignals(true)) {
370 SPDLOG_LOGGER_INFO(mLog, "Name: {}, Unit: {}, Type: {}", signal->name,
371 signal->unit, node::signalTypeToString(signal->type));
372 }
373
374 SPDLOG_LOGGER_INFO(mLog, "Import signals:");
375 for (const auto &signal : *mNode->getInputSignals(true)) {
376 SPDLOG_LOGGER_INFO(mLog, "Name: {}, Unit: {}, Type: {}", signal->name,
377 signal->unit, node::signalTypeToString(signal->type));
378 }
379}
380
381} // namespace DPsim
virtual void syncImports() override
Function called by the Simulation to perform interface synchronization.
InterfaceVillasQueueless(const String &nodeConfig, const String &name="", spdlog::level::level_enum logLevel=spdlog::level::level_enum::info)
create a new InterfaceVillasQueueless instance