DPsim
Loading...
Searching...
No Matches
InterfaceQueued.cpp
1/* Author: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
2 * SPDX-FileCopyrightText: 2023-2024 Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
3 * SPDX-License-Identifier: MPL-2.0
4 */
5
6#include <dpsim/InterfaceQueued.h>
7#include <dpsim/InterfaceWorker.h>
8
9using namespace CPS;
10
11namespace DPsim {
12
13void InterfaceQueued::open() {
14 mInterfaceWorker->open();
15 mOpened = true;
16
17 if (!mImportAttrsDpsim.empty()) {
18 mInterfaceReaderThread = std::thread(InterfaceQueued::ReaderThread(
19 mQueueInterfaceToDpsim, mInterfaceWorker, mOpened));
20 }
21 if (!mExportAttrsDpsim.empty()) {
22 mInterfaceWriterThread = std::thread(InterfaceQueued::WriterThread(
23 mQueueDpsimToInterface, mInterfaceWorker));
24 }
25}
26
27void InterfaceQueued::close() {
28 mOpened = false;
29 mQueueDpsimToInterface->emplace(AttributePacket{
30 nullptr, 0, 0, AttributePacketFlags::PACKET_CLOSE_INTERFACE});
31
32 if (!mExportAttrsDpsim.empty()) {
33 mInterfaceWriterThread.join();
34 }
35
36 if (!mImportAttrsDpsim.empty()) {
37 mInterfaceReaderThread.join();
38 }
39 mInterfaceWorker->close();
40}
41
42CPS::Task::List InterfaceQueued::getTasks() {
43 auto tasks = CPS::Task::List();
44 if (!mImportAttrsDpsim.empty()) {
45 tasks.push_back(std::make_shared<InterfaceQueued::PreStep>(*this));
46 }
47 if (!mExportAttrsDpsim.empty()) {
48 tasks.push_back(std::make_shared<InterfaceQueued::PostStep>(*this));
49 }
50 return tasks;
51}
52
53void InterfaceQueued::PreStep::execute(Real time, Int timeStepCount) {
54 if (timeStepCount % mIntf.mDownsampling == 0)
55 mIntf.popDpsimAttrsFromQueue();
56}
57
58void InterfaceQueued::PostStep::execute(Real time, Int timeStepCount) {
59 if (timeStepCount % mIntf.mDownsampling == 0)
60 mIntf.pushDpsimAttrsToQueue();
61}
62
63void InterfaceQueued::setLogger(CPS::Logger::Log log) {
64 Interface::setLogger(log);
65 if (mInterfaceWorker != nullptr) {
66 mInterfaceWorker->mLog = log;
67 }
68}
69
71 // Block on read until all attributes with syncOnSimulationStart are read
72 this->popDpsimAttrsFromQueue(true);
73}
74
75void InterfaceQueued::syncExports() {
76 // Just push all the attributes
77 this->pushDpsimAttrsToQueue();
78}
79
80void InterfaceQueued::popDpsimAttrsFromQueue(bool isSync) {
81 AttributePacket receivedPacket = {nullptr, 0, 0,
82 AttributePacketFlags::PACKET_NO_FLAGS};
83 UInt currentSequenceId = mNextSequenceInterfaceToDpsim;
84
85 // Wait for and dequeue all attributes that read should block on
86 // The std::find_if will look for all attributes that have not been updated in the current while loop (i. e. whose sequence ID is lower than the next expected sequence ID)
87 while (std::find_if(mImportAttrsDpsim.cbegin(), mImportAttrsDpsim.cend(),
88 [currentSequenceId, isSync](auto attrTuple) {
89 auto &[_attr, seqId, blockOnRead, syncOnStart] =
90 attrTuple;
91 if (isSync) {
92 return syncOnStart && seqId < currentSequenceId;
93 } else {
94 return blockOnRead && seqId < currentSequenceId;
95 }
96 }) != mImportAttrsDpsim.cend()) {
97 if (mQueueInterfaceToDpsim->try_dequeue(receivedPacket) != false) {
98 int i = 0;
99 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
100 i++;
101 }
102 SPDLOG_LOGGER_WARN(mLog,
103 "Overrun detected! Discarding {} overrun packets!", i);
104 } else {
105 mQueueInterfaceToDpsim->wait_dequeue(receivedPacket);
106 }
107 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
108 ->copyValue(receivedPacket.value)) {
109 SPDLOG_LOGGER_WARN(
110 mLog, "Failed to copy received value onto attribute in Interface!");
111 }
112 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
113 receivedPacket.sequenceId;
114 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
115 }
116
117 // Fetch all remaining queue packets
118 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
119 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
120 ->copyValue(receivedPacket.value)) {
121 SPDLOG_LOGGER_WARN(
122 mLog, "Failed to copy received value onto attribute in Interface!");
123 }
124 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
125 receivedPacket.sequenceId;
126 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
127 }
128}
129
130void InterfaceQueued::pushDpsimAttrsToQueue() {
131 for (UInt i = 0; i < mExportAttrsDpsim.size(); i++) {
132 mQueueDpsimToInterface->emplace(AttributePacket{
133 std::get<0>(mExportAttrsDpsim[i])->cloneValueOntoNewAttribute(), i,
134 std::get<1>(mExportAttrsDpsim[i]),
135 AttributePacketFlags::PACKET_NO_FLAGS});
136 std::get<1>(mExportAttrsDpsim[i]) = mCurrentSequenceDpsimToInterface;
137 mCurrentSequenceDpsimToInterface++;
138 }
139}
140
141void InterfaceQueued::WriterThread::operator()() const {
142 bool interfaceClosed = false;
143 std::vector<InterfaceQueued::AttributePacket> attrsToWrite;
144 while (!interfaceClosed) {
145 AttributePacket nextPacket = {nullptr, 0, 0,
146 AttributePacketFlags::PACKET_NO_FLAGS};
147
148 // Wait for at least one packet
149 mQueueDpsimToInterface->wait_dequeue(nextPacket);
150 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
151 interfaceClosed = true;
152 } else {
153 attrsToWrite.push_back(nextPacket);
154 }
155
156 // See if there are more packets
157 while (mQueueDpsimToInterface->try_dequeue(nextPacket)) {
158 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
159 interfaceClosed = true;
160 } else {
161 attrsToWrite.push_back(nextPacket);
162 }
163 }
164 mInterfaceWorker->writeValuesToEnv(attrsToWrite);
165 }
166}
167
168void InterfaceQueued::ReaderThread::operator()() const {
169 std::vector<InterfaceQueued::AttributePacket> attrsRead;
170 while (mOpened) {
171 mInterfaceWorker->readValuesFromEnv(attrsRead);
172 for (const auto &packet : attrsRead) {
173 mQueueInterfaceToDpsim->enqueue(packet);
174 }
175 attrsRead.clear();
176 }
177}
178
179} // namespace DPsim
virtual void syncImports() override
Function called by the Simulation to perform interface synchronization.