DPsim
Loading...
Searching...
No Matches
Scheduler.h
1/* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2 * EONERC, RWTH Aachen University
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7 *********************************************************************************/
8
9#pragma once
10
11#include <dpsim-models/Task.h>
12
13#include <dpsim-models/Logger.h>
14#include <dpsim/Definitions.h>
15
16#include <atomic>
17#include <chrono>
18#include <condition_variable>
19#include <deque>
20#include <mutex>
21#include <unordered_map>
22
23namespace DPsim {
24// TODO extend / subclass
26
27class Scheduler {
28public:
31 typedef std::unordered_map<CPS::Task::Ptr, std::deque<CPS::Task::Ptr>> Edges;
33 typedef std::chrono::steady_clock::duration TaskTime;
34
36 Scheduler(CPS::Logger::Level logLevel = CPS::Logger::Level::off)
37 : mRoot(std::make_shared<Root>()),
38 // Logging
39 mLogLevel(logLevel), mSLog(CPS::Logger::get("scheduler", logLevel)) {
40 mSLog->set_pattern("[%L] %v");
41 }
43 virtual ~Scheduler() {}
44
45 // #### Interface functions ####
46
48 virtual void createSchedule(const CPS::Task::List &tasks,
49 const Edges &inEdges, const Edges &outEdges) = 0;
51 virtual void step(Real time, Int timeStepCount) = 0;
53 virtual void stop() {}
54
57 void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges);
58
59 // Special attribute that can be returned in the modified attributes of a task
60 // to mark that this task has external side-effects (like logging / interfacing)
61 // and thus has to be executed even though it doesn't modify any attribute.
62 // TODO is it really fine to use nullptr or should we create a special sentinel attribute?
63 static CPS::AttributeBase::Ptr external;
64
65 TaskTime getAveragedMeasurement(CPS::Task::Ptr task) {
66 return getAveragedMeasurement(task.get());
67 }
68
71 class Root : public CPS::Task {
72 public:
73 Root() : Task("Root") { mAttributeDependencies.push_back(external); }
74
75 void execute(Real time, Int timeStepCount) { throw SchedulingException(); }
76 };
77
78protected:
80 void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges,
81 const Edges &outEdges, CPS::Task::List &sortedTasks);
84 static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges,
85 const Edges &outEdges,
86 std::vector<CPS::Task::List> &levels);
87
88 void initMeasurements(const CPS::Task::List &tasks);
91 void updateMeasurement(CPS::Task *task, TaskTime time);
93 void writeMeasurements(CPS::String filename);
96 CPS::String filename,
97 std::unordered_map<CPS::String, TaskTime::rep> &measurements);
99 TaskTime getAveragedMeasurement(CPS::Task *task);
100
102 CPS::Task::Ptr mRoot;
104 CPS::Logger::Level mLogLevel;
106 CPS::Logger::Log mSLog;
107
108private:
109 // TODO more sophisticated measurement method might be necessary for
110 // longer simulations (risk of high memory requirements and integer
111 // overflow)
112 std::unordered_map<CPS::Task *, std::vector<TaskTime>> mMeasurements;
113};
114
118class Barrier {
119public:
121 Barrier() = delete;
124 Barrier(Int limit, Bool useCondition = false)
125 : mLimit(limit), mCount(0), mGeneration(0), mUseCondition(useCondition) {}
126
130 void wait() {
131 if (mUseCondition) {
132 std::unique_lock<std::mutex> lk(mMutex);
133 Int gen = mGeneration;
134 mCount++;
135 if (mCount == mLimit) {
136 mCount = 0;
137 mGeneration++;
138 lk.unlock();
139 mCondition.notify_all();
140 } else {
141 // necessary because of spurious wakeups
142 while (gen == mGeneration)
143 mCondition.wait(lk);
144 }
145 } else {
146 Int gen = mGeneration.load(std::memory_order_acquire);
147 // We need at least one release from each thread to ensure that
148 // every write from before the wait() is visible in every other thread,
149 // and the fetch needs to be an acquire anyway, so use acq_rel instead of acquire.
150 // (This generates the same code on x86.)
151 if (mCount.fetch_add(1, std::memory_order_acq_rel) == mLimit - 1) {
152 mCount.store(0, std::memory_order_relaxed);
153 mGeneration.fetch_add(1, std::memory_order_release);
154 } else {
155 while (mGeneration.load(std::memory_order_acquire) == gen)
156 ;
157 }
158 }
159 }
160
165 void signal() {
166 if (mUseCondition) {
167 std::unique_lock<std::mutex> lk(mMutex);
168 mCount++;
169 if (mCount == mLimit) {
170 mCount = 0;
171 mGeneration++;
172 lk.unlock();
173 mCondition.notify_all();
174 }
175 } else {
176 // No release here, as this call does not provide any synchronization anyway.
177 if (mCount.fetch_add(1, std::memory_order_acquire) == mLimit - 1) {
178 mCount.store(0, std::memory_order_relaxed);
179 mGeneration.fetch_add(1, std::memory_order_release);
180 }
181 }
182 }
183
184private:
186 Int mLimit;
188 std::atomic<Int> mCount;
190 std::atomic<Int> mGeneration;
191 Bool mUseCondition;
192
193 std::mutex mMutex;
194 std::condition_variable mCondition;
195};
196
197class BarrierTask : public CPS::Task {
198public:
199 typedef std::shared_ptr<BarrierTask> Ptr;
200 void addBarrier(Barrier *b);
201 void execute(Real time, Int timeStepCount);
202
203private:
204 std::vector<Barrier *> mBarriers;
205};
206
207class Counter {
208public:
209 Counter() : mValue(0) {}
210
211 void inc() { mValue.fetch_add(1, std::memory_order_release); }
212
213 void wait(Int value) {
214 while (mValue.load(std::memory_order_acquire) != value)
215 ;
216 }
217
218private:
219 std::atomic<Int> mValue;
220};
221} // namespace DPsim
Tasks to be defined by every component.
Definition Task.h:25
Barrier()=delete
Constructor without parameters is forbidden.
Barrier(Int limit, Bool useCondition=false)
Definition Scheduler.h:124
void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges)
Definition Scheduler.cpp:78
void writeMeasurements(CPS::String filename)
Write measurement data to file.
Definition Scheduler.cpp:32
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
CPS::Logger::Level mLogLevel
Log level.
Definition Scheduler.h:104
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition Scheduler.cpp:45
virtual void step(Real time, Int timeStepCount)=0
Performs a single simulation step.
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition Scheduler.h:31
virtual void createSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges)=0
Creates the schedule for the given dependency graph.
void updateMeasurement(CPS::Task *task, TaskTime time)
Definition Scheduler.cpp:28
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
Definition Scheduler.h:53
std::chrono::steady_clock::duration TaskTime
Time measurement for the task execution.
Definition Scheduler.h:33
CPS::Logger::Log mSLog
Logger.
Definition Scheduler.h:106