6 #include <dpsim/InterfaceQueued.h>
7 #include <dpsim/InterfaceWorker.h>
13 void InterfaceQueued::open() {
14 mInterfaceWorker->open();
17 if (!mImportAttrsDpsim.empty()) {
18 mInterfaceReaderThread = std::thread(InterfaceQueued::ReaderThread(mQueueInterfaceToDpsim, mInterfaceWorker, mOpened));
20 if (!mExportAttrsDpsim.empty()) {
21 mInterfaceWriterThread = std::thread(InterfaceQueued::WriterThread(mQueueDpsimToInterface, mInterfaceWorker));
25 void InterfaceQueued::close() {
27 mQueueDpsimToInterface->emplace(AttributePacket{
nullptr, 0, 0, AttributePacketFlags::PACKET_CLOSE_INTERFACE});
29 if (!mExportAttrsDpsim.empty()) {
30 mInterfaceWriterThread.join();
33 if (!mImportAttrsDpsim.empty()) {
34 mInterfaceReaderThread.join();
36 mInterfaceWorker->close();
39 CPS::Task::List InterfaceQueued::getTasks() {
40 auto tasks = CPS::Task::List();
41 if (!mImportAttrsDpsim.empty()) {
42 tasks.push_back(std::make_shared<InterfaceQueued::PreStep>(*
this));
44 if (!mExportAttrsDpsim.empty()) {
45 tasks.push_back(std::make_shared<InterfaceQueued::PostStep>(*
this));
50 void InterfaceQueued::PreStep::execute(Real time, Int timeStepCount) {
51 if (timeStepCount % mIntf.mDownsampling == 0)
52 mIntf.popDpsimAttrsFromQueue();
55 void InterfaceQueued::PostStep::execute(Real time, Int timeStepCount) {
56 if (timeStepCount % mIntf.mDownsampling == 0)
57 mIntf.pushDpsimAttrsToQueue();
60 void InterfaceQueued::setLogger(CPS::Logger::Log log) {
61 Interface::setLogger(log);
62 if (mInterfaceWorker !=
nullptr) {
63 mInterfaceWorker->mLog = log;
67 void InterfaceQueued::syncImports() {
69 this->popDpsimAttrsFromQueue(
true);
72 void InterfaceQueued::syncExports() {
74 this->pushDpsimAttrsToQueue();
77 void InterfaceQueued::popDpsimAttrsFromQueue(
bool isSync) {
78 AttributePacket receivedPacket = {
nullptr, 0, 0, AttributePacketFlags::PACKET_NO_FLAGS};
79 UInt currentSequenceId = mNextSequenceInterfaceToDpsim;
83 while (std::find_if(mImportAttrsDpsim.cbegin(), mImportAttrsDpsim.cend(), [currentSequenceId, isSync](
auto attrTuple) {
84 auto &[_attr, seqId, blockOnRead, syncOnStart] = attrTuple;
86 return syncOnStart && seqId < currentSequenceId;
88 return blockOnRead && seqId < currentSequenceId;
90 }) != mImportAttrsDpsim.cend()) {
91 if (mQueueInterfaceToDpsim->try_dequeue(receivedPacket) !=
false) {
93 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
96 SPDLOG_LOGGER_WARN(mLog,
"Overrun detected! Discarding {} overrun packets!", i);
98 mQueueInterfaceToDpsim->wait_dequeue(receivedPacket);
100 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])->copyValue(receivedPacket.value)) {
101 SPDLOG_LOGGER_WARN(mLog,
"Failed to copy received value onto attribute in Interface!");
103 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) = receivedPacket.sequenceId;
104 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
108 while (mQueueInterfaceToDpsim->try_dequeue(receivedPacket)) {
109 if (!std::get<0>(mImportAttrsDpsim[receivedPacket.attributeId])->copyValue(receivedPacket.value)) {
110 SPDLOG_LOGGER_WARN(mLog,
"Failed to copy received value onto attribute in Interface!");
112 std::get<1>(mImportAttrsDpsim[receivedPacket.attributeId]) = receivedPacket.sequenceId;
113 mNextSequenceInterfaceToDpsim = receivedPacket.sequenceId + 1;
117 void InterfaceQueued::pushDpsimAttrsToQueue() {
118 for (UInt i = 0; i < mExportAttrsDpsim.size(); i++) {
119 mQueueDpsimToInterface->emplace(AttributePacket{std::get<0>(mExportAttrsDpsim[i])->cloneValueOntoNewAttribute(), i, std::get<1>(mExportAttrsDpsim[i]), AttributePacketFlags::PACKET_NO_FLAGS});
120 std::get<1>(mExportAttrsDpsim[i]) = mCurrentSequenceDpsimToInterface;
121 mCurrentSequenceDpsimToInterface++;
125 void InterfaceQueued::WriterThread::operator()()
const {
126 bool interfaceClosed =
false;
127 std::vector<InterfaceQueued::AttributePacket> attrsToWrite;
128 while (!interfaceClosed) {
129 AttributePacket nextPacket = {
nullptr, 0, 0, AttributePacketFlags::PACKET_NO_FLAGS};
132 mQueueDpsimToInterface->wait_dequeue(nextPacket);
133 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
134 interfaceClosed =
true;
136 attrsToWrite.push_back(nextPacket);
140 while (mQueueDpsimToInterface->try_dequeue(nextPacket)) {
141 if (nextPacket.flags & AttributePacketFlags::PACKET_CLOSE_INTERFACE) {
142 interfaceClosed =
true;
144 attrsToWrite.push_back(nextPacket);
147 mInterfaceWorker->writeValuesToEnv(attrsToWrite);
151 void InterfaceQueued::ReaderThread::operator()()
const {
152 std::vector<InterfaceQueued::AttributePacket> attrsRead;
154 mInterfaceWorker->readValuesFromEnv(attrsRead);
155 for (
const auto &packet : attrsRead) {
156 mQueueInterfaceToDpsim->enqueue(packet);