DPsim
Scheduler.cpp
1 /* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2  * EONERC, RWTH Aachen University
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7  *********************************************************************************/
8 
9 #include <dpsim/Scheduler.h>
10 
11 #include <fstream>
12 #include <iostream>
13 #include <unordered_map>
14 #include <unordered_set>
15 
16 using namespace CPS;
17 using namespace DPsim;
18 
19 CPS::AttributeBase::Ptr Scheduler::external;
20 
21 void Scheduler::initMeasurements(const Task::List &tasks) {
22  // Fill map here already since it's not protected by a mutex
23  for (auto task : tasks) {
24  mMeasurements[task.get()] = std::vector<TaskTime>();
25  }
26 }
27 
28 void Scheduler::updateMeasurement(Task *ptr, TaskTime time) {
29  mMeasurements[ptr].push_back(time);
30 }
31 
32 void Scheduler::writeMeasurements(String filename) {
33  std::ofstream os(filename);
34  std::unordered_map<String, TaskTime> averages;
35  for (auto &pair : mMeasurements) {
36  averages[pair.first->toString()] = getAveragedMeasurement(pair.first);
37  }
38  // TODO think of nicer output format
39  for (auto pair : averages) {
40  os << pair.first << "," << pair.second.count() << std::endl;
41  }
42  os.close();
43 }
44 
45 void Scheduler::readMeasurements(
46  String filename, std::unordered_map<String, TaskTime::rep> &measurements) {
47  std::ifstream fs(filename);
48  if (!fs.good())
49  throw SchedulingException();
50 
51  String name;
52  while (fs.good()) {
53  std::string line;
54  std::getline(fs, line);
55  int idx = static_cast<UInt>(line.find(','));
56  if (idx == -1) {
57  if (line.empty())
58  continue;
59  throw SchedulingException();
60  }
61  measurements[line.substr(0, idx)] = std::stol(line.substr(idx + 1));
62  }
63 }
64 
65 Scheduler::TaskTime Scheduler::getAveragedMeasurement(CPS::Task *task) {
66  TaskTime avg(0), tot(0);
67 
68  for (TaskTime time : mMeasurements[task]) {
69  tot += time;
70  }
71 
72  if (!mMeasurements[task].empty())
73  avg = tot / mMeasurements[task].size();
74 
75  return avg;
76 }
77 
78 void Scheduler::resolveDeps(Task::List &tasks, Edges &inEdges,
79  Edges &outEdges) {
80  // Create graph (list of out/in edges for each node) from attribute dependencies
81  tasks.push_back(mRoot);
82  std::unordered_map<AttributeBase::Ptr, std::deque<Task::Ptr>,
83  std::hash<AttributeBase::Ptr>,
85  dependencies;
86  std::unordered_set<AttributeBase::Ptr, std::hash<AttributeBase::Ptr>,
88  prevStepDependencies;
89  for (auto task : tasks) {
90  for (AttributeBase::Ptr attr : task->getAttributeDependencies()) {
92  if (attr.getPtr() != Scheduler::external.getPtr()) {
93  AttributeBase::Set attrDependencies = attr->getDependencies();
94  for (AttributeBase::Ptr dep : attrDependencies) {
95  dependencies[dep].push_back(task);
96  }
97  } else {
98  dependencies[attr].push_back(task);
99  }
100  }
101  for (AttributeBase::Ptr attr : task->getPrevStepDependencies()) {
102  prevStepDependencies.insert(attr);
103  }
104  }
105 
106  for (auto from : tasks) {
107  for (AttributeBase::Ptr attr : from->getModifiedAttributes()) {
108  for (auto to : dependencies[attr]) {
109  outEdges[from].push_back(to);
110  inEdges[to].push_back(from);
111  }
112  if (prevStepDependencies.count(attr)) {
113  outEdges[from].push_back(mRoot);
114  inEdges[mRoot].push_back(from);
115  }
116  }
117  }
118 }
119 
120 void Scheduler::topologicalSort(const Task::List &tasks, const Edges &inEdges,
121  const Edges &outEdges,
122  Task::List &sortedTasks) {
123  sortedTasks.clear();
124 
125  // make copies of the edge lists because we modify them (and it makes
126  // things easier since empty lists are automatically created)
127  Edges inEdgesCpy = inEdges, outEdgesCpy = outEdges;
128 
129  // do a breadth-first search backwards from the root node first to filter
130  // out unnecessary nodes
131  std::deque<Task::Ptr> q;
132  std::unordered_set<Task::Ptr> visited;
133 
134  q.push_back(mRoot);
135  while (!q.empty()) {
136  auto t = q.front();
137  q.pop_front();
138  if (visited.count(t))
139  continue;
140 
141  visited.insert(t);
142  for (auto dep : inEdgesCpy[t]) {
143  if (!visited.count(dep)) {
144  q.push_back(dep);
145  }
146  }
147  }
148 
149  for (auto t : tasks) {
150  if (inEdgesCpy[t].empty()) {
151  q.push_back(t);
152  }
153  }
154  // keep list of tasks without incoming edges;
155  // iteratively remove such tasks from the graph and put them into the schedule
156  while (!q.empty()) {
157  Task::Ptr t = q.front();
158  q.pop_front();
159  if (!visited.count(t)) {
160  // don't put unneeded tasks in the schedule, but process them as usual
161  // so the cycle check still works
162  SPDLOG_LOGGER_INFO(mSLog, "Dropping {:s}", t->toString());
163  } else if (t != mRoot) {
164  sortedTasks.push_back(t);
165  }
166 
167  for (auto after : outEdgesCpy[t]) {
168  for (auto edgeIt = inEdgesCpy[after].begin();
169  edgeIt != inEdgesCpy[after].end(); ++edgeIt) {
170  if (*edgeIt == t) {
171  inEdgesCpy[after].erase(edgeIt);
172  break;
173  }
174  }
175  if (inEdgesCpy[after].empty()) {
176  q.push_back(after);
177  }
178  }
179  outEdgesCpy.erase(t);
180  }
181 
182  // sanity check: all edges should have been removed, otherwise
183  // the graph had a cycle
184  for (auto t : tasks) {
185  if (!outEdgesCpy[t].empty() || !inEdgesCpy[t].empty())
186  throw SchedulingException();
187  }
188 }
189 
190 void Scheduler::levelSchedule(const Task::List &tasks, const Edges &inEdges,
191  const Edges &outEdges,
192  std::vector<Task::List> &levels) {
193  std::unordered_map<Task::Ptr, int> time;
194 
195  for (auto task : tasks) {
196  if (inEdges.find(task) == inEdges.end() || inEdges.at(task).empty()) {
197  time[task] = 0;
198  } else {
199  int maxdist = 0;
200  for (auto before : inEdges.at(task)) {
201  if (time[before] > maxdist)
202  maxdist = time[before];
203  }
204  time[task] = maxdist + 1;
205  }
206  }
207 
208  levels.clear();
209  levels.resize(time[tasks.back()] + 1);
210  for (auto task : tasks) {
211  levels[time[task]].push_back(task);
212  }
213 }
214 
215 void BarrierTask::addBarrier(Barrier *b) { mBarriers.push_back(b); }
216 
217 void BarrierTask::execute(Real time, Int timeStepCount) {
218  if (mBarriers.size() == 1) {
219  mBarriers[0]->wait();
220  } else {
221  for (size_t i = 0; i < mBarriers.size() - 1; i++) {
222  mBarriers[i]->signal();
223  }
224  mBarriers[mBarriers.size() - 1]->wait();
225  }
226 }
Tasks to be defined by every component.
Definition: Task.h:25
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition: Scheduler.h:31
std::chrono::steady_clock::duration TaskTime
Time measurement for the task execution.
Definition: Scheduler.h:33