DPsim
Loading...
Searching...
No Matches
Scheduler.cpp
1/* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2 * EONERC, RWTH Aachen University
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7 *********************************************************************************/
8
9#include <dpsim/Scheduler.h>
10
11#include <fstream>
12#include <iostream>
13#include <unordered_map>
14#include <unordered_set>
15
16using namespace CPS;
17using namespace DPsim;
18
19CPS::AttributeBase::Ptr Scheduler::external;
20
21void Scheduler::initMeasurements(const Task::List &tasks) {
22 // Fill map here already since it's not protected by a mutex
23 for (auto task : tasks) {
24 mMeasurements[task.get()] = std::vector<TaskTime>();
25 }
26}
27
29 mMeasurements[ptr].push_back(time);
30}
31
32void Scheduler::writeMeasurements(String filename) {
33 std::ofstream os(filename);
34 std::unordered_map<String, TaskTime> averages;
35 for (auto &pair : mMeasurements) {
36 averages[pair.first->toString()] = getAveragedMeasurement(pair.first);
37 }
38 // TODO think of nicer output format
39 for (auto pair : averages) {
40 os << pair.first << "," << pair.second.count() << std::endl;
41 }
42 os.close();
43}
44
46 String filename, std::unordered_map<String, TaskTime::rep> &measurements) {
47 std::ifstream fs(filename);
48 if (!fs.good())
49 throw SchedulingException();
50
51 String name;
52 while (fs.good()) {
53 std::string line;
54 std::getline(fs, line);
55 int idx = static_cast<UInt>(line.find(','));
56 if (idx == -1) {
57 if (line.empty())
58 continue;
59 throw SchedulingException();
60 }
61 measurements[line.substr(0, idx)] = std::stol(line.substr(idx + 1));
62 }
63}
64
65Scheduler::TaskTime Scheduler::getAveragedMeasurement(CPS::Task *task) {
66 TaskTime avg(0), tot(0);
67
68 for (TaskTime time : mMeasurements[task]) {
69 tot += time;
70 }
71
72 if (!mMeasurements[task].empty())
73 avg = tot / mMeasurements[task].size();
74
75 return avg;
76}
77
78void Scheduler::resolveDeps(Task::List &tasks, Edges &inEdges,
79 Edges &outEdges) {
80 // Create graph (list of out/in edges for each node) from attribute dependencies
81 tasks.push_back(mRoot);
82 std::unordered_map<AttributeBase::Ptr, std::deque<Task::Ptr>,
83 std::hash<AttributeBase::Ptr>,
85 dependencies;
86 std::unordered_set<AttributeBase::Ptr, std::hash<AttributeBase::Ptr>,
88 prevStepDependencies;
89 for (auto task : tasks) {
90 for (AttributeBase::Ptr attr : task->getAttributeDependencies()) {
92 if (attr.getPtr() != Scheduler::external.getPtr()) {
93 AttributeBase::Set attrDependencies = attr->getDependencies();
94 for (AttributeBase::Ptr dep : attrDependencies) {
95 dependencies[dep].push_back(task);
96 }
97 } else {
98 dependencies[attr].push_back(task);
99 }
100 }
101 for (AttributeBase::Ptr attr : task->getPrevStepDependencies()) {
102 prevStepDependencies.insert(attr);
103 }
104 }
105
106 for (auto from : tasks) {
107 for (AttributeBase::Ptr attr : from->getModifiedAttributes()) {
108 for (auto to : dependencies[attr]) {
109 outEdges[from].push_back(to);
110 inEdges[to].push_back(from);
111 }
112 if (prevStepDependencies.count(attr)) {
113 outEdges[from].push_back(mRoot);
114 inEdges[mRoot].push_back(from);
115 }
116 }
117 }
118}
119
120void Scheduler::topologicalSort(const Task::List &tasks, const Edges &inEdges,
121 const Edges &outEdges,
122 Task::List &sortedTasks) {
123 sortedTasks.clear();
124
125 // make copies of the edge lists because we modify them (and it makes
126 // things easier since empty lists are automatically created)
127 Edges inEdgesCpy = inEdges, outEdgesCpy = outEdges;
128
129 // do a breadth-first search backwards from the root node first to filter
130 // out unnecessary nodes
131 std::deque<Task::Ptr> q;
132 std::unordered_set<Task::Ptr> visited;
133
134 q.push_back(mRoot);
135 while (!q.empty()) {
136 auto t = q.front();
137 q.pop_front();
138 if (visited.count(t))
139 continue;
140
141 visited.insert(t);
142 for (auto dep : inEdgesCpy[t]) {
143 if (!visited.count(dep)) {
144 q.push_back(dep);
145 }
146 }
147 }
148
149 for (auto t : tasks) {
150 if (inEdgesCpy[t].empty()) {
151 q.push_back(t);
152 }
153 }
154 // keep list of tasks without incoming edges;
155 // iteratively remove such tasks from the graph and put them into the schedule
156 while (!q.empty()) {
157 Task::Ptr t = q.front();
158 q.pop_front();
159 if (!visited.count(t)) {
160 // don't put unneeded tasks in the schedule, but process them as usual
161 // so the cycle check still works
162 SPDLOG_LOGGER_INFO(mSLog, "Dropping {:s}", t->toString());
163 } else if (t != mRoot) {
164 sortedTasks.push_back(t);
165 }
166
167 for (auto after : outEdgesCpy[t]) {
168 for (auto edgeIt = inEdgesCpy[after].begin();
169 edgeIt != inEdgesCpy[after].end(); ++edgeIt) {
170 if (*edgeIt == t) {
171 inEdgesCpy[after].erase(edgeIt);
172 break;
173 }
174 }
175 if (inEdgesCpy[after].empty()) {
176 q.push_back(after);
177 }
178 }
179 outEdgesCpy.erase(t);
180 }
181
182 // sanity check: all edges should have been removed, otherwise
183 // the graph had a cycle
184 for (auto t : tasks) {
185 if (!outEdgesCpy[t].empty() || !inEdgesCpy[t].empty())
186 throw SchedulingException();
187 }
188}
189
190void Scheduler::levelSchedule(const Task::List &tasks, const Edges &inEdges,
191 const Edges &outEdges,
192 std::vector<Task::List> &levels) {
193 std::unordered_map<Task::Ptr, int> time;
194
195 for (auto task : tasks) {
196 if (inEdges.find(task) == inEdges.end() || inEdges.at(task).empty()) {
197 time[task] = 0;
198 } else {
199 int maxdist = 0;
200 for (auto before : inEdges.at(task)) {
201 if (time[before] > maxdist)
202 maxdist = time[before];
203 }
204 time[task] = maxdist + 1;
205 }
206 }
207
208 levels.clear();
209 levels.resize(time[tasks.back()] + 1);
210 for (auto task : tasks) {
211 levels[time[task]].push_back(task);
212 }
213}
214
215void BarrierTask::addBarrier(Barrier *b) { mBarriers.push_back(b); }
216
217void BarrierTask::execute(Real time, Int timeStepCount) {
218 if (mBarriers.size() == 1) {
219 mBarriers[0]->wait();
220 } else {
221 for (size_t i = 0; i < mBarriers.size() - 1; i++) {
222 mBarriers[i]->signal();
223 }
224 mBarriers[mBarriers.size() - 1]->wait();
225 }
226}
Tasks to be defined by every component.
Definition Task.h:25
void resolveDeps(CPS::Task::List &tasks, Edges &inEdges, Edges &outEdges)
Definition Scheduler.cpp:78
void writeMeasurements(CPS::String filename)
Write measurement data to file.
Definition Scheduler.cpp:32
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition Scheduler.cpp:45
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition Scheduler.h:31
void updateMeasurement(CPS::Task *task, TaskTime time)
Definition Scheduler.cpp:28
std::chrono::steady_clock::duration TaskTime
Time measurement for the task execution.
Definition Scheduler.h:33
CPS::Logger::Log mSLog
Logger.
Definition Scheduler.h:106