DPsim
ThreadScheduler.cpp
1 /* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2  * EONERC, RWTH Aachen University
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7  *********************************************************************************/
8 
9 #include <dpsim/ThreadScheduler.h>
10 
11 #include <iostream>
12 
13 using namespace CPS;
14 using namespace DPsim;
15 
16 ThreadScheduler::ThreadScheduler(Int threads, String outMeasurementFile,
17  Bool useConditionVariable)
18  : mNumThreads(threads), mOutMeasurementFile(outMeasurementFile),
19  mStartBarrier(threads, useConditionVariable) {
20  if (threads < 1)
21  throw SchedulingException();
22  mTempSchedules.resize(threads);
23  mSchedules.resize(threads, nullptr);
24 }
25 
26 ThreadScheduler::~ThreadScheduler() {
27  for (int i = 0; i < mNumThreads; i++)
28  delete[] mSchedules[i];
29 }
30 
31 void ThreadScheduler::scheduleTask(int thread, CPS::Task::Ptr task) {
32  mTempSchedules[thread].push_back(task);
33 }
34 
35 void ThreadScheduler::finishSchedule(const Edges &inEdges) {
36  std::map<CPS::Task::Ptr, Counter *> counters;
37  for (int thread = 0; thread < mNumThreads; thread++) {
38  // std::cout << "Thread " << thread << std::endl;
39  // for (auto& entry : mSchedules[thread]) {
40  // Task* t = entry.task.get();
41  // char *refpos = reinterpret_cast<char*>(reinterpret_cast<void*>(t)) + sizeof(Task);
42  // void *ref = *(reinterpret_cast<void**>(refpos));
43  // std::cout << entry.task->toString() << " " << ref << std::endl;
44  // }
45  mSchedules[thread] = new ScheduleEntry[mTempSchedules[thread].size()];
46  for (size_t i = 0; i < mTempSchedules[thread].size(); i++) {
47  auto &task = mTempSchedules[thread][i];
48  mSchedules[thread][i].task = task.get();
49  counters[task] = &mSchedules[thread][i].endCounter;
50  }
51  }
52  for (int thread = 0; thread < mNumThreads; thread++) {
53  for (size_t i = 0; i < mTempSchedules[thread].size(); i++) {
54  auto &task = mTempSchedules[thread][i];
55  if (inEdges.find(task) != inEdges.end()) {
56  for (auto req : inEdges.at(task)) {
57  mSchedules[thread][i].reqCounters.push_back(counters[req]);
58  }
59  }
60  }
61  }
62  for (int i = 1; i < mNumThreads; i++) {
63  mThreads.emplace_back(threadFunction, this, i);
64  }
65 }
66 
67 void ThreadScheduler::step(Real time, Int timeStepCount) {
68  mTime = time;
69  mTimeStepCount = timeStepCount;
70  mStartBarrier.wait();
71  doStep(0);
72  // since we don't have a final BarrierTask, wait for all threads to finish
73  // their last task explicitly
74  for (int thread = 1; thread < mNumThreads; thread++) {
75  if (mTempSchedules[thread].size() != 0)
76  mSchedules[thread][mTempSchedules[thread].size() - 1].endCounter.wait(
77  mTimeStepCount + 1);
78  }
79 }
80 
82  if (!mThreads.empty()) {
83  mJoining = true;
84  mStartBarrier.wait();
85  for (size_t thread = 0; thread < mThreads.size(); thread++) {
86  mThreads[thread].join();
87  }
88  }
89  if (!mOutMeasurementFile.empty()) {
90  writeMeasurements(mOutMeasurementFile);
91  }
92 }
93 
94 void ThreadScheduler::threadFunction(ThreadScheduler *sched, Int idx) {
95  while (true) {
96  sched->mStartBarrier.wait();
97  if (sched->mJoining)
98  return;
99 
100  sched->doStep(idx);
101  }
102 }
103 
104 void ThreadScheduler::doStep(Int thread) {
105  if (mOutMeasurementFile.empty()) {
106  for (size_t i = 0; i != mTempSchedules[thread].size(); i++) {
107  ScheduleEntry *entry = &mSchedules[thread][i];
108  for (Counter *counter : entry->reqCounters)
109  counter->wait(mTimeStepCount + 1);
110  entry->task->execute(mTime, mTimeStepCount);
111  entry->endCounter.inc();
112  }
113  } else {
114  for (size_t i = 0; i != mTempSchedules[thread].size(); i++) {
115  ScheduleEntry *entry = &mSchedules[thread][i];
116  for (Counter *counter : entry->reqCounters)
117  counter->wait(mTimeStepCount + 1);
118  auto start = std::chrono::steady_clock::now();
119  entry->task->execute(mTime, mTimeStepCount);
120  auto end = std::chrono::steady_clock::now();
121  updateMeasurement(entry->task, end - start);
122  entry->endCounter.inc();
123  }
124  }
125 }
void writeMeasurements(CPS::String filename)
Write measurement data to file.
Definition: Scheduler.cpp:32
void updateMeasurement(CPS::Task *task, TaskTime time)
Definition: Scheduler.cpp:28
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
void step(Real time, Int timeStepCount)
Performs a single simulation step.