6#include <dpsim/InterfaceQueued.h>
7#include <dpsim/InterfaceWorker.h>
13void InterfaceQueued::open() {
14 mInterfaceWorker->open();
17 if (!mImportAttrsDpsim.empty()) {
18 mInterfaceReaderThread = std::thread(InterfaceQueued::ReaderThread(
19 mQueueInterfaceToDpsim, mInterfaceWorker, mOpened));
21 if (!mExportAttrsDpsim.empty()) {
22 mInterfaceWriterThread = std::thread(InterfaceQueued::WriterThread(
23 mQueueDpsimToInterface, mInterfaceWorker));
27void InterfaceQueued::close() {
29 mQueueDpsimToInterface->emplace(AttributePacket{
30 nullptr, 0, 0, AttributePacketFlags::PACKET_CLOSE_INTERFACE});
32 if (!mExportAttrsDpsim.empty()) {
33 mInterfaceWriterThread.join();
36 if (!mImportAttrsDpsim.empty()) {
37 mInterfaceReaderThread.join();
39 mInterfaceWorker->close();
42CPS::Task::List InterfaceQueued::getTasks() {
43 auto tasks = CPS::Task::List();
44 if (!mImportAttrsDpsim.empty()) {
45 tasks.push_back(std::make_shared<InterfaceQueued::PreStep>(*
this));
47 if (!mExportAttrsDpsim.empty()) {
48 tasks.push_back(std::make_shared<InterfaceQueued::PostStep>(*
this));
53void InterfaceQueued::PreStep::execute(Real time, Int timeStepCount) {
54 if (timeStepCount % mIntf.mDownsampling == 0)
55 mIntf.popDpsimAttrsFromQueue();
58void InterfaceQueued::PostStep::execute(Real time, Int timeStepCount) {
59 if (timeStepCount % mIntf.mDownsampling == 0)
60 mIntf.pushDpsimAttrsToQueue();
63void InterfaceQueued::setLogger(CPS::Logger::Log log) {
64 Interface::setLogger(log);
65 if (mInterfaceWorker !=
nullptr) {
66 mInterfaceWorker->mLog = log;
72 this->popDpsimAttrsFromQueue(
true);
75void InterfaceQueued::syncExports() {
77 this->pushDpsimAttrsToQueue();
80void InterfaceQueued::popDpsimAttrsFromQueue(
bool isSync) {
81 AttributePacket receivedPacket = {
nullptr, 0, 0,
82 AttributePacketFlags::PACKET_NO_FLAGS};
83 UInt currentSequenceId = mNextSequenceInterfaceToDpsim;
87 while (std::find_if(mImportAttrsDpsim.cbegin(), mImportAttrsDpsim.cend(),
88 [currentSequenceId, isSync](
auto attrTuple) {
89 auto &[_attr, seqId, blockOnRead, syncOnStart] =
92 return syncOnStart && seqId < currentSequenceId;
94 return blockOnRead && seqId < currentSequenceId;
96 }) != mImportAttrsDpsim.cend()) {
97 if (mQueueInterfaceToDpsim->try_dequeue(receivedPacket) !=
false) {
99 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
102 SPDLOG_LOGGER_WARN(mLog,
103 "Overrun detected! Discarding {} overrun packets!", i);
105 mQueueInterfaceToDpsim->wait_dequeue(receivedPacket);
107 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
108 ->copyValue(receivedPacket.value)) {
110 mLog,
"Failed to copy received value onto attribute in Interface!");
112 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
113 receivedPacket.sequenceId;
114 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
118 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
119 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])
120 ->copyValue(receivedPacket.value)) {
122 mLog,
"Failed to copy received value onto attribute in Interface!");
124 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) =
125 receivedPacket.sequenceId;
126 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
130void InterfaceQueued::pushDpsimAttrsToQueue() {
131 for (UInt i = 0; i < mExportAttrsDpsim.size(); i++) {
132 mQueueDpsimToInterface->emplace(AttributePacket{
133 std::get<0>(mExportAttrsDpsim[i])->cloneValueOntoNewAttribute(), i,
134 std::get<1>(mExportAttrsDpsim[i]),
135 AttributePacketFlags::PACKET_NO_FLAGS});
136 std::get<1>(mExportAttrsDpsim[i]) = mCurrentSequenceDpsimToInterface;
137 mCurrentSequenceDpsimToInterface++;
141void InterfaceQueued::WriterThread::operator()()
const {
142 bool interfaceClosed =
false;
143 std::vector<InterfaceQueued::AttributePacket> attrsToWrite;
144 while (!interfaceClosed) {
145 AttributePacket nextPacket = {
nullptr, 0, 0,
146 AttributePacketFlags::PACKET_NO_FLAGS};
149 mQueueDpsimToInterface->wait_dequeue(nextPacket);
150 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
151 interfaceClosed =
true;
153 attrsToWrite.push_back(nextPacket);
157 while (mQueueDpsimToInterface->try_dequeue(nextPacket)) {
158 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
159 interfaceClosed =
true;
161 attrsToWrite.push_back(nextPacket);
164 mInterfaceWorker->writeValuesToEnv(attrsToWrite);
168void InterfaceQueued::ReaderThread::operator()()
const {
169 std::vector<InterfaceQueued::AttributePacket> attrsRead;
171 mInterfaceWorker->readValuesFromEnv(attrsRead);
172 for (
const auto &packet : attrsRead) {
173 mQueueInterfaceToDpsim->enqueue(packet);
virtual void syncImports() override
Function called by the Simulation to perform interface synchronization.