DPsim
InterfaceQueued.cpp
1 /* Author: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
2  * SPDX-FileCopyrightText: 2023-2024 Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
3  * SPDX-License-Identifier: MPL-2.0
4  */
5 
6 #include <dpsim/InterfaceQueued.h>
7 #include <dpsim/InterfaceWorker.h>
8 
9 using namespace CPS;
10 
11 namespace DPsim {
12 
13 void InterfaceQueued::open() {
14  mInterfaceWorker->open();
15  mOpened = true;
16 
17  if (!mImportAttrsDpsim.empty()) {
18  mInterfaceReaderThread = std::thread(InterfaceQueued::ReaderThread(mQueueInterfaceToDpsim, mInterfaceWorker, mOpened));
19  }
20  if (!mExportAttrsDpsim.empty()) {
21  mInterfaceWriterThread = std::thread(InterfaceQueued::WriterThread(mQueueDpsimToInterface, mInterfaceWorker));
22  }
23 }
24 
25 void InterfaceQueued::close() {
26  mOpened = false;
27  mQueueDpsimToInterface->emplace(AttributePacket{nullptr, 0, 0, AttributePacketFlags::PACKET_CLOSE_INTERFACE});
28 
29  if (!mExportAttrsDpsim.empty()) {
30  mInterfaceWriterThread.join();
31  }
32 
33  if (!mImportAttrsDpsim.empty()) {
34  mInterfaceReaderThread.join();
35  }
36  mInterfaceWorker->close();
37 }
38 
39 CPS::Task::List InterfaceQueued::getTasks() {
40  auto tasks = CPS::Task::List();
41  if (!mImportAttrsDpsim.empty()) {
42  tasks.push_back(std::make_shared<InterfaceQueued::PreStep>(*this));
43  }
44  if (!mExportAttrsDpsim.empty()) {
45  tasks.push_back(std::make_shared<InterfaceQueued::PostStep>(*this));
46  }
47  return tasks;
48 }
49 
50 void InterfaceQueued::PreStep::execute(Real time, Int timeStepCount) {
51  if (timeStepCount % mIntf.mDownsampling == 0)
52  mIntf.popDpsimAttrsFromQueue();
53 }
54 
55 void InterfaceQueued::PostStep::execute(Real time, Int timeStepCount) {
56  if (timeStepCount % mIntf.mDownsampling == 0)
57  mIntf.pushDpsimAttrsToQueue();
58 }
59 
60 void InterfaceQueued::setLogger(CPS::Logger::Log log) {
61  Interface::setLogger(log);
62  if (mInterfaceWorker != nullptr) {
63  mInterfaceWorker->mLog = log;
64  }
65 }
66 
67 void InterfaceQueued::syncImports() {
68  // Block on read until all attributes with syncOnSimulationStart are read
69  this->popDpsimAttrsFromQueue(true);
70 }
71 
72 void InterfaceQueued::syncExports() {
73  // Just push all the attributes
74  this->pushDpsimAttrsToQueue();
75 }
76 
77 void InterfaceQueued::popDpsimAttrsFromQueue(bool isSync) {
78  AttributePacket receivedPacket = {nullptr, 0, 0, AttributePacketFlags::PACKET_NO_FLAGS};
79  UInt currentSequenceId = mNextSequenceInterfaceToDpsim;
80 
81  // Wait for and dequeue all attributes that read should block on
82  // The std::find_if will look for all attributes that have not been updated in the current while loop (i. e. whose sequence ID is lower than the next expected sequence ID)
83  while (std::find_if(mImportAttrsDpsim.cbegin(), mImportAttrsDpsim.cend(), [currentSequenceId, isSync](auto attrTuple) {
84  auto &[_attr, seqId, blockOnRead, syncOnStart] = attrTuple;
85  if (isSync) {
86  return syncOnStart && seqId < currentSequenceId;
87  } else {
88  return blockOnRead && seqId < currentSequenceId;
89  }
90  }) != mImportAttrsDpsim.cend()) {
91  if (mQueueInterfaceToDpsim->try_dequeue(receivedPacket) != false) {
92  int i = 0;
93  while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
94  i++;
95  }
96  SPDLOG_LOGGER_WARN(mLog, "Overrun detected! Discarding {} overrun packets!", i);
97  } else {
98  mQueueInterfaceToDpsim->wait_dequeue(receivedPacket);
99  }
100  if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])->copyValue(receivedPacket.value)) {
101  SPDLOG_LOGGER_WARN(mLog, "Failed to copy received value onto attribute in Interface!");
102  }
103  std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) = receivedPacket.sequenceId;
104  mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
105  }
106 
107  // Fetch all remaining queue packets
108  while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
109  if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])->copyValue(receivedPacket.value)) {
110  SPDLOG_LOGGER_WARN(mLog, "Failed to copy received value onto attribute in Interface!");
111  }
112  std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) = receivedPacket.sequenceId;
113  mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
114  }
115 }
116 
117 void InterfaceQueued::pushDpsimAttrsToQueue() {
118  for (UInt i = 0; i < mExportAttrsDpsim.size(); i++) {
119  mQueueDpsimToInterface->emplace(AttributePacket{std::get<0>(mExportAttrsDpsim[i])->cloneValueOntoNewAttribute(), i, std::get<1>(mExportAttrsDpsim[i]), AttributePacketFlags::PACKET_NO_FLAGS});
120  std::get<1>(mExportAttrsDpsim[i]) = mCurrentSequenceDpsimToInterface;
121  mCurrentSequenceDpsimToInterface++;
122  }
123 }
124 
125 void InterfaceQueued::WriterThread::operator()() const {
126  bool interfaceClosed = false;
127  std::vector<InterfaceQueued::AttributePacket> attrsToWrite;
128  while (!interfaceClosed) {
129  AttributePacket nextPacket = {nullptr, 0, 0, AttributePacketFlags::PACKET_NO_FLAGS};
130 
131  // Wait for at least one packet
132  mQueueDpsimToInterface->wait_dequeue(nextPacket);
133  if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
134  interfaceClosed = true;
135  } else {
136  attrsToWrite.push_back(nextPacket);
137  }
138 
139  // See if there are more packets
140  while (mQueueDpsimToInterface->try_dequeue(nextPacket)) {
141  if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
142  interfaceClosed = true;
143  } else {
144  attrsToWrite.push_back(nextPacket);
145  }
146  }
147  mInterfaceWorker->writeValuesToEnv(attrsToWrite);
148  }
149 }
150 
151 void InterfaceQueued::ReaderThread::operator()() const {
152  std::vector<InterfaceQueued::AttributePacket> attrsRead;
153  while (mOpened) {
154  mInterfaceWorker->readValuesFromEnv(attrsRead);
155  for (const auto &packet : attrsRead) {
156  mQueueInterfaceToDpsim->enqueue(packet);
157  }
158  attrsRead.clear();
159  }
160 }
161 
162 } // namespace DPsim