DPsim
Scheduler.h
1 /* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2  * EONERC, RWTH Aachen University
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7  *********************************************************************************/
8 
9 #pragma once
10 
11 #include <dpsim-models/Task.h>
12 
13 #include <dpsim-models/Logger.h>
14 #include <dpsim/Definitions.h>
15 
16 #include <atomic>
17 #include <chrono>
18 #include <condition_variable>
19 #include <deque>
20 #include <mutex>
21 #include <unordered_map>
22 
23 namespace DPsim {
24 // TODO extend / subclass
26 
27 class Scheduler {
28 public:
31  typedef std::unordered_map<CPS::Task::Ptr, std::deque<CPS::Task::Ptr>> Edges;
33  typedef std::chrono::steady_clock::duration TaskTime;
34 
36  Scheduler(CPS::Logger::Level logLevel = CPS::Logger::Level::off)
37  : mRoot(std::make_shared<Root>()),
38  // Logging
39  mLogLevel(logLevel), mSLog(CPS::Logger::get("scheduler", logLevel)) {
40  mSLog->set_pattern("[%L] %v");
41  }
43  virtual ~Scheduler() {}
44 
45  // #### Interface functions ####
46 
48  virtual void createSchedule(const CPS::Task::List &tasks,
49  const Edges &inEdges, const Edges &outEdges) = 0;
51  virtual void step(Real time, Int timeStepCount) = 0;
53  virtual void stop() {}
54 
57  void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges);
58 
59  // Special attribute that can be returned in the modified attributes of a task
60  // to mark that this task has external side-effects (like logging / interfacing)
61  // and thus has to be executed even though it doesn't modify any attribute.
62  // TODO is it really fine to use nullptr or should we create a special sentinel attribute?
63  static CPS::AttributeBase::Ptr external;
64 
65  TaskTime getAveragedMeasurement(CPS::Task::Ptr task) {
66  return getAveragedMeasurement(task.get());
67  }
68 
71  class Root : public CPS::Task {
72  public:
73  Root() : Task("Root") { mAttributeDependencies.push_back(external); }
74 
75  void execute(Real time, Int timeStepCount) { throw SchedulingException(); }
76  };
77 
78 protected:
80  void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges,
81  const Edges &outEdges, CPS::Task::List &sortedTasks);
84  static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges,
85  const Edges &outEdges,
86  std::vector<CPS::Task::List> &levels);
87 
88  void initMeasurements(const CPS::Task::List &tasks);
91  void updateMeasurement(CPS::Task *task, TaskTime time);
93  void writeMeasurements(CPS::String filename);
95  void readMeasurements(
96  CPS::String filename,
97  std::unordered_map<CPS::String, TaskTime::rep> &measurements);
99  TaskTime getAveragedMeasurement(CPS::Task *task);
100 
102  CPS::Task::Ptr mRoot;
104  CPS::Logger::Level mLogLevel;
106  CPS::Logger::Log mSLog;
107 
108 private:
109  // TODO more sophisticated measurement method might be necessary for
110  // longer simulations (risk of high memory requirements and integer
111  // overflow)
112  std::unordered_map<CPS::Task *, std::vector<TaskTime>> mMeasurements;
113 };
114 
118 class Barrier {
119 public:
121  Barrier() = delete;
124  Barrier(Int limit, Bool useCondition = false)
125  : mLimit(limit), mCount(0), mGeneration(0), mUseCondition(useCondition) {}
126 
130  void wait() {
131  if (mUseCondition) {
132  std::unique_lock<std::mutex> lk(mMutex);
133  Int gen = mGeneration;
134  mCount++;
135  if (mCount == mLimit) {
136  mCount = 0;
137  mGeneration++;
138  lk.unlock();
139  mCondition.notify_all();
140  } else {
141  // necessary because of spurious wakeups
142  while (gen == mGeneration)
143  mCondition.wait(lk);
144  }
145  } else {
146  Int gen = mGeneration.load(std::memory_order_acquire);
147  // We need at least one release from each thread to ensure that
148  // every write from before the wait() is visible in every other thread,
149  // and the fetch needs to be an acquire anyway, so use acq_rel instead of acquire.
150  // (This generates the same code on x86.)
151  if (mCount.fetch_add(1, std::memory_order_acq_rel) == mLimit - 1) {
152  mCount.store(0, std::memory_order_relaxed);
153  mGeneration.fetch_add(1, std::memory_order_release);
154  } else {
155  while (mGeneration.load(std::memory_order_acquire) == gen)
156  ;
157  }
158  }
159  }
160 
165  void signal() {
166  if (mUseCondition) {
167  std::unique_lock<std::mutex> lk(mMutex);
168  mCount++;
169  if (mCount == mLimit) {
170  mCount = 0;
171  mGeneration++;
172  lk.unlock();
173  mCondition.notify_all();
174  }
175  } else {
176  // No release here, as this call does not provide any synchronization anyway.
177  if (mCount.fetch_add(1, std::memory_order_acquire) == mLimit - 1) {
178  mCount.store(0, std::memory_order_relaxed);
179  mGeneration.fetch_add(1, std::memory_order_release);
180  }
181  }
182  }
183 
184 private:
186  Int mLimit;
188  std::atomic<Int> mCount;
190  std::atomic<Int> mGeneration;
191  Bool mUseCondition;
192 
193  std::mutex mMutex;
194  std::condition_variable mCondition;
195 };
196 
197 class BarrierTask : public CPS::Task {
198 public:
199  typedef std::shared_ptr<BarrierTask> Ptr;
200  void addBarrier(Barrier *b);
201  void execute(Real time, Int timeStepCount);
202 
203 private:
204  std::vector<Barrier *> mBarriers;
205 };
206 
207 class Counter {
208 public:
209  Counter() : mValue(0) {}
210 
211  void inc() { mValue.fetch_add(1, std::memory_order_release); }
212 
213  void wait(Int value) {
214  while (mValue.load(std::memory_order_acquire) != value)
215  ;
216  }
217 
218 private:
219  std::atomic<Int> mValue;
220 };
221 } // namespace DPsim
Tasks to be defined by every component.
Definition: Task.h:25
Barrier()=delete
Constructor without parameters is forbidden.
void signal()
Definition: Scheduler.h:165
Barrier(Int limit, Bool useCondition=false)
Definition: Scheduler.h:124
void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges)
Definition: Scheduler.cpp:78
void writeMeasurements(CPS::String filename)
Write measurement data to file.
Definition: Scheduler.cpp:32
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
Definition: Scheduler.cpp:120
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
Definition: Scheduler.cpp:190
CPS::Logger::Level mLogLevel
Log level.
Definition: Scheduler.h:104
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition: Scheduler.cpp:45
virtual void step(Real time, Int timeStepCount)=0
Performs a single simulation step.
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition: Scheduler.h:31
virtual void createSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges)=0
Creates the schedule for the given dependency graph.
void updateMeasurement(CPS::Task *task, TaskTime time)
Definition: Scheduler.cpp:28
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
Definition: Scheduler.h:53
std::chrono::steady_clock::duration TaskTime
Time measurement for the task execution.
Definition: Scheduler.h:33
CPS::Logger::Log mSLog
Logger.
Definition: Scheduler.h:106