DPsim
Loading...
Searching...
No Matches
ThreadScheduler.cpp
1/* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2 * EONERC, RWTH Aachen University
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7 *********************************************************************************/
8
9#include <dpsim/ThreadScheduler.h>
10
11#include <iostream>
12
13using namespace CPS;
14using namespace DPsim;
15
16ThreadScheduler::ThreadScheduler(Int threads, String outMeasurementFile,
17 Bool useConditionVariable)
18 : mNumThreads(threads), mOutMeasurementFile(outMeasurementFile),
19 mStartBarrier(threads, useConditionVariable) {
20 if (threads < 1)
21 throw SchedulingException();
22 mTempSchedules.resize(threads);
23 mSchedules.resize(threads, nullptr);
24}
25
26ThreadScheduler::~ThreadScheduler() {
27 for (int i = 0; i < mNumThreads; i++)
28 delete[] mSchedules[i];
29}
30
31void ThreadScheduler::scheduleTask(int thread, CPS::Task::Ptr task) {
32 mTempSchedules[thread].push_back(task);
33}
34
35void ThreadScheduler::finishSchedule(const Edges &inEdges) {
36 std::map<CPS::Task::Ptr, Counter *> counters;
37 for (int thread = 0; thread < mNumThreads; thread++) {
38 // std::cout << "Thread " << thread << std::endl;
39 // for (auto& entry : mSchedules[thread]) {
40 // Task* t = entry.task.get();
41 // char *refpos = reinterpret_cast<char*>(reinterpret_cast<void*>(t)) + sizeof(Task);
42 // void *ref = *(reinterpret_cast<void**>(refpos));
43 // std::cout << entry.task->toString() << " " << ref << std::endl;
44 // }
45 mSchedules[thread] = new ScheduleEntry[mTempSchedules[thread].size()];
46 for (size_t i = 0; i < mTempSchedules[thread].size(); i++) {
47 auto &task = mTempSchedules[thread][i];
48 mSchedules[thread][i].task = task.get();
49 counters[task] = &mSchedules[thread][i].endCounter;
50 }
51 }
52 for (int thread = 0; thread < mNumThreads; thread++) {
53 for (size_t i = 0; i < mTempSchedules[thread].size(); i++) {
54 auto &task = mTempSchedules[thread][i];
55 if (inEdges.find(task) != inEdges.end()) {
56 for (auto req : inEdges.at(task)) {
57 mSchedules[thread][i].reqCounters.push_back(counters[req]);
58 }
59 }
60 }
61 }
62 for (int i = 1; i < mNumThreads; i++) {
63 mThreads.emplace_back(threadFunction, this, i);
64 }
65}
66
67void ThreadScheduler::step(Real time, Int timeStepCount) {
68 mTime = time;
69 mTimeStepCount = timeStepCount;
70 mStartBarrier.wait();
71 doStep(0);
72 // since we don't have a final BarrierTask, wait for all threads to finish
73 // their last task explicitly
74 for (int thread = 1; thread < mNumThreads; thread++) {
75 if (mTempSchedules[thread].size() != 0)
76 mSchedules[thread][mTempSchedules[thread].size() - 1].endCounter.wait(
77 mTimeStepCount + 1);
78 }
79}
80
82 if (!mThreads.empty()) {
83 mJoining = true;
84 mStartBarrier.wait();
85 for (size_t thread = 0; thread < mThreads.size(); thread++) {
86 mThreads[thread].join();
87 }
88 }
89 if (!mOutMeasurementFile.empty()) {
90 writeMeasurements(mOutMeasurementFile);
91 }
92}
93
94void ThreadScheduler::threadFunction(ThreadScheduler *sched, Int idx) {
95 while (true) {
96 sched->mStartBarrier.wait();
97 if (sched->mJoining)
98 return;
99
100 sched->doStep(idx);
101 }
102}
103
104void ThreadScheduler::doStep(Int thread) {
105 if (mOutMeasurementFile.empty()) {
106 for (size_t i = 0; i != mTempSchedules[thread].size(); i++) {
107 ScheduleEntry *entry = &mSchedules[thread][i];
108 for (Counter *counter : entry->reqCounters)
109 counter->wait(mTimeStepCount + 1);
110 entry->task->execute(mTime, mTimeStepCount);
111 entry->endCounter.inc();
112 }
113 } else {
114 for (size_t i = 0; i != mTempSchedules[thread].size(); i++) {
115 ScheduleEntry *entry = &mSchedules[thread][i];
116 for (Counter *counter : entry->reqCounters)
117 counter->wait(mTimeStepCount + 1);
118 auto start = std::chrono::steady_clock::now();
119 entry->task->execute(mTime, mTimeStepCount);
120 auto end = std::chrono::steady_clock::now();
121 updateMeasurement(entry->task, end - start);
122 entry->endCounter.inc();
123 }
124 }
125}
void writeMeasurements(CPS::String filename)
Write measurement data to file.
Definition Scheduler.cpp:32
void updateMeasurement(CPS::Task *task, TaskTime time)
Definition Scheduler.cpp:28
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
void step(Real time, Int timeStepCount)
Performs a single simulation step.