9 #include <dpsim/ThreadScheduler.h>
14 using namespace DPsim;
16 ThreadScheduler::ThreadScheduler(Int threads, String outMeasurementFile,
17 Bool useConditionVariable)
18 : mNumThreads(threads), mOutMeasurementFile(outMeasurementFile),
19 mStartBarrier(threads, useConditionVariable) {
22 mTempSchedules.resize(threads);
23 mSchedules.resize(threads,
nullptr);
26 ThreadScheduler::~ThreadScheduler() {
27 for (
int i = 0; i < mNumThreads; i++)
28 delete[] mSchedules[i];
31 void ThreadScheduler::scheduleTask(
int thread, CPS::Task::Ptr task) {
32 mTempSchedules[thread].push_back(task);
35 void ThreadScheduler::finishSchedule(
const Edges &inEdges) {
36 std::map<CPS::Task::Ptr, Counter *> counters;
37 for (
int thread = 0; thread < mNumThreads; thread++) {
45 mSchedules[thread] =
new ScheduleEntry[mTempSchedules[thread].size()];
46 for (
size_t i = 0; i < mTempSchedules[thread].size(); i++) {
47 auto &task = mTempSchedules[thread][i];
48 mSchedules[thread][i].task = task.get();
49 counters[task] = &mSchedules[thread][i].endCounter;
52 for (
int thread = 0; thread < mNumThreads; thread++) {
53 for (
size_t i = 0; i < mTempSchedules[thread].size(); i++) {
54 auto &task = mTempSchedules[thread][i];
55 if (inEdges.find(task) != inEdges.end()) {
56 for (
auto req : inEdges.at(task)) {
57 mSchedules[thread][i].reqCounters.push_back(counters[req]);
62 for (
int i = 1; i < mNumThreads; i++) {
63 mThreads.emplace_back(threadFunction,
this, i);
69 mTimeStepCount = timeStepCount;
74 for (
int thread = 1; thread < mNumThreads; thread++) {
75 if (mTempSchedules[thread].size() != 0)
76 mSchedules[thread][mTempSchedules[thread].size() - 1].endCounter.wait(
82 if (!mThreads.empty()) {
85 for (
size_t thread = 0; thread < mThreads.size(); thread++) {
86 mThreads[thread].join();
89 if (!mOutMeasurementFile.empty()) {
96 sched->mStartBarrier.
wait();
104 void ThreadScheduler::doStep(Int thread) {
105 if (mOutMeasurementFile.empty()) {
106 for (
size_t i = 0; i != mTempSchedules[thread].size(); i++) {
107 ScheduleEntry *entry = &mSchedules[thread][i];
108 for (
Counter *counter : entry->reqCounters)
109 counter->wait(mTimeStepCount + 1);
110 entry->task->execute(mTime, mTimeStepCount);
111 entry->endCounter.inc();
114 for (
size_t i = 0; i != mTempSchedules[thread].size(); i++) {
115 ScheduleEntry *entry = &mSchedules[thread][i];
116 for (
Counter *counter : entry->reqCounters)
117 counter->wait(mTimeStepCount + 1);
118 auto start = std::chrono::steady_clock::now();
119 entry->task->execute(mTime, mTimeStepCount);
120 auto end = std::chrono::steady_clock::now();
122 entry->endCounter.inc();
void writeMeasurements(CPS::String filename)
Write measurement data to file.
void updateMeasurement(CPS::Task *task, TaskTime time)
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
void step(Real time, Int timeStepCount)
Performs a single simulation step.