DPsim
Interface.cpp
1 // SPDX-License-Identifier: Apache-2.0
2 
3 #include <dpsim/Interface.h>
4 #include <dpsim/InterfaceWorker.h>
5 
6 using namespace CPS;
7 
8 namespace DPsim {
9 
10 void Interface::open() {
11  mInterfaceWorker->open();
12  mOpened = true;
13 
14  if (!mImportAttrsDpsim.empty()) {
15  mInterfaceReaderThread = std::thread(Interface::ReaderThread(
16  mQueueInterfaceToDpsim, mInterfaceWorker, mOpened));
17  }
18  if (!mExportAttrsDpsim.empty()) {
19  mInterfaceWriterThread = std::thread(
20  Interface::WriterThread(mQueueDpsimToInterface, mInterfaceWorker));
21  }
22 }
23 
24 void Interface::close() {
25  mOpened = false;
26  mQueueDpsimToInterface->emplace(AttributePacket{
27  nullptr, 0, 0, AttributePacketFlags::PACKET_CLOSE_INTERFACE});
28 
29  if (!mExportAttrsDpsim.empty()) {
30  mInterfaceWriterThread.join();
31  }
32 
33  if (!mImportAttrsDpsim.empty()) {
34  mInterfaceReaderThread.join();
35  }
36  mInterfaceWorker->close();
37 }
38 
39 CPS::Task::List Interface::getTasks() {
40  auto tasks = CPS::Task::List();
41  if (!mImportAttrsDpsim.empty()) {
42  tasks.push_back(std::make_shared<Interface::PreStep>(*this));
43  }
44  if (!mExportAttrsDpsim.empty()) {
45  tasks.push_back(std::make_shared<Interface::PostStep>(*this));
46  }
47  return tasks;
48 }
49 
50 void Interface::PreStep::execute(Real time, Int timeStepCount) {
51  if (timeStepCount % mIntf.mDownsampling == 0)
52  mIntf.popDpsimAttrsFromQueue();
53 }
54 
55 void Interface::PostStep::execute(Real time, Int timeStepCount) {
56  if (timeStepCount % mIntf.mDownsampling == 0)
57  mIntf.pushDpsimAttrsToQueue();
58 }
59 
60 void Interface::addImport(CPS::AttributeBase::Ptr attr, bool blockOnRead,
61  bool syncOnSimulationStart) {
62  if (mOpened) {
63  SPDLOG_LOGGER_ERROR(
64  mLog, "Cannot modify interface configuration after simulation start!");
65  std::exit(1);
66  }
67 
68  mImportAttrsDpsim.emplace_back(attr, 0, blockOnRead, syncOnSimulationStart);
69 }
70 
71 void Interface::addExport(CPS::AttributeBase::Ptr attr) {
72  if (mOpened) {
73  SPDLOG_LOGGER_ERROR(
74  mLog, "Cannot modify interface configuration after simulation start!");
75  std::exit(1);
76  }
77 
78  mExportAttrsDpsim.emplace_back(attr, 0);
79 }
80 
81 void Interface::setLogger(CPS::Logger::Log log) {
82  mLog = log;
83  if (mInterfaceWorker != nullptr) {
84  mInterfaceWorker->mLog = log;
85  }
86 }
87 
88 void Interface::syncImports() {
89  //Block on read until all attributes with syncOnSimulationStart are read
90  this->popDpsimAttrsFromQueue(true);
91 }
92 
93 void Interface::syncExports() {
94  //Just push all the attributes
95  this->pushDpsimAttrsToQueue();
96 }
97 
98 void Interface::popDpsimAttrsFromQueue(bool isSync) {
99  AttributePacket receivedPacket = {nullptr, 0, 0,
100  AttributePacketFlags::PACKET_NO_FLAGS};
101  UInt currentSequenceId = mNextSequenceInterfaceToDpsim;
102 
103  //Wait for and dequeue all attributes that read should block on
104  //The std::find_if will look for all attributes that have not been updated in the current while loop (i. e. whose sequence ID is lower than the next expected sequence ID)
105  while (std::find_if(mImportAttrsDpsim.cbegin(), mImportAttrsDpsim.cend(),
106  [currentSequenceId, isSync](auto attrTuple) {
107  auto &[_attr, seqId, blockOnRead, syncOnStart] =
108  attrTuple;
109  if (isSync) {
110  return syncOnStart && seqId < currentSequenceId;
111  } else {
112  return blockOnRead && seqId < currentSequenceId;
113  }
114  }) != mImportAttrsDpsim.cend()) {
115  mQueueInterfaceToDpsim->wait_dequeue(receivedPacket);
116  if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
117  ->copyValue(receivedPacket.value)) {
118  SPDLOG_LOGGER_WARN(
119  mLog, "Failed to copy received value onto attribute in Interface!");
120  }
121  std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
122  receivedPacket.sequenceId;
123  mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
124  }
125 
126  //Fetch all remaining queue packets
127  while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
128  if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
129  ->copyValue(receivedPacket.value)) {
130  SPDLOG_LOGGER_WARN(
131  mLog, "Failed to copy received value onto attribute in Interface!");
132  }
133  std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
134  receivedPacket.sequenceId;
135  mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
136  }
137 }
138 
139 void Interface::pushDpsimAttrsToQueue() {
140  for (UInt i = 0; i < mExportAttrsDpsim.size(); i++) {
141  mQueueDpsimToInterface->emplace(AttributePacket{
142  std::get<0>(mExportAttrsDpsim[i])->cloneValueOntoNewAttribute(), i,
143  std::get<1>(mExportAttrsDpsim[i]),
144  AttributePacketFlags::PACKET_NO_FLAGS});
145  std::get<1>(mExportAttrsDpsim[i]) = mCurrentSequenceDpsimToInterface;
146  mCurrentSequenceDpsimToInterface++;
147  }
148 }
149 
150 void Interface::WriterThread::operator()() const {
151  bool interfaceClosed = false;
152  std::vector<Interface::AttributePacket> attrsToWrite;
153  while (!interfaceClosed) {
154  AttributePacket nextPacket = {nullptr, 0, 0,
155  AttributePacketFlags::PACKET_NO_FLAGS};
156 
157  //Wait for at least one packet
158  mQueueDpsimToInterface->wait_dequeue(nextPacket);
159  if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
160  interfaceClosed = true;
161  } else {
162  attrsToWrite.push_back(nextPacket);
163  }
164 
165  //See if there are more packets
166  while (mQueueDpsimToInterface->try_dequeue(nextPacket)) {
167  if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
168  interfaceClosed = true;
169  } else {
170  attrsToWrite.push_back(nextPacket);
171  }
172  }
173  mInterfaceWorker->writeValuesToEnv(attrsToWrite);
174  }
175 }
176 
177 void Interface::ReaderThread::operator()() const {
178  std::vector<Interface::AttributePacket> attrsRead;
179  while (mOpened) {
180  mInterfaceWorker->readValuesFromEnv(attrsRead);
181  for (const auto &packet : attrsRead) {
182  mQueueInterfaceToDpsim->enqueue(packet);
183  }
184  attrsRead.clear();
185  }
186 }
187 
188 } // namespace DPsim