11 #include <dpsim-models/Task.h>
13 #include <dpsim-models/Logger.h>
14 #include <dpsim/Definitions.h>
18 #include <condition_variable>
21 #include <unordered_map>
31 typedef std::unordered_map<CPS::Task::Ptr, std::deque<CPS::Task::Ptr>>
Edges;
33 typedef std::chrono::steady_clock::duration
TaskTime;
36 Scheduler(CPS::Logger::Level logLevel = CPS::Logger::Level::off)
37 : mRoot(std::make_shared<
Root>()),
40 mSLog->set_pattern(
"[%L] %v");
49 const Edges &inEdges,
const Edges &outEdges) = 0;
51 virtual void step(Real time, Int timeStepCount) = 0;
65 TaskTime getAveragedMeasurement(CPS::Task::Ptr task) {
66 return getAveragedMeasurement(task.get());
73 Root() : Task(
"Root") { mAttributeDependencies.push_back(external); }
81 const Edges &outEdges, CPS::Task::List &sortedTasks);
85 const Edges &outEdges,
86 std::vector<CPS::Task::List> &levels);
88 void initMeasurements(
const CPS::Task::List &tasks);
97 std::unordered_map<CPS::String, TaskTime::rep> &measurements);
102 CPS::Task::Ptr mRoot;
112 std::unordered_map<CPS::Task *, std::vector<TaskTime>> mMeasurements;
125 : mLimit(limit), mCount(0), mGeneration(0), mUseCondition(useCondition) {}
132 std::unique_lock<std::mutex> lk(mMutex);
133 Int gen = mGeneration;
135 if (mCount == mLimit) {
139 mCondition.notify_all();
142 while (gen == mGeneration)
146 Int gen = mGeneration.load(std::memory_order_acquire);
151 if (mCount.fetch_add(1, std::memory_order_acq_rel) == mLimit - 1) {
152 mCount.store(0, std::memory_order_relaxed);
153 mGeneration.fetch_add(1, std::memory_order_release);
155 while (mGeneration.load(std::memory_order_acquire) == gen)
167 std::unique_lock<std::mutex> lk(mMutex);
169 if (mCount == mLimit) {
173 mCondition.notify_all();
177 if (mCount.fetch_add(1, std::memory_order_acquire) == mLimit - 1) {
178 mCount.store(0, std::memory_order_relaxed);
179 mGeneration.fetch_add(1, std::memory_order_release);
188 std::atomic<Int> mCount;
190 std::atomic<Int> mGeneration;
194 std::condition_variable mCondition;
199 typedef std::shared_ptr<BarrierTask> Ptr;
201 void execute(Real time, Int timeStepCount);
204 std::vector<Barrier *> mBarriers;
211 void inc() { mValue.fetch_add(1, std::memory_order_release); }
213 void wait(Int value) {
214 while (mValue.load(std::memory_order_acquire) != value)
219 std::atomic<Int> mValue;
Tasks to be defined by every component.
Barrier()=delete
Constructor without parameters is forbidden.
Barrier(Int limit, Bool useCondition=false)
void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges)
void writeMeasurements(CPS::String filename)
Write measurement data to file.
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
CPS::Logger::Level mLogLevel
Log level.
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
virtual void step(Real time, Int timeStepCount)=0
Performs a single simulation step.
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
virtual void createSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges)=0
Creates the schedule for the given dependency graph.
void updateMeasurement(CPS::Task *task, TaskTime time)
virtual void stop()
Called on simulation stop to reliably clean up e.g. running helper threads.
std::chrono::steady_clock::duration TaskTime
Time measurement for the task execution.
CPS::Logger::Log mSLog
Logger.