DPsim
ThreadListScheduler.cpp
1 /* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2  * EONERC, RWTH Aachen University
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7  *********************************************************************************/
8 
9 #include <dpsim/ThreadListScheduler.h>
10 
11 #include <queue>
12 
13 using namespace CPS;
14 using namespace DPsim;
15 
16 ThreadListScheduler::ThreadListScheduler(Int threads, String outMeasurementFile,
17  String inMeasurementFile,
18  Bool useConditionVariables)
19  : ThreadScheduler(threads, outMeasurementFile, useConditionVariables),
20  mInMeasurementFile(inMeasurementFile) {}
21 
22 void ThreadListScheduler::createSchedule(const Task::List &tasks,
23  const Edges &inEdges,
24  const Edges &outEdges) {
25  Task::List ordered;
26 
27  Scheduler::topologicalSort(tasks, inEdges, outEdges, ordered);
28  Scheduler::initMeasurements(ordered);
29 
30  std::unordered_map<Task::Ptr, int64_t> priorities;
31  std::unordered_map<String, TaskTime::rep> measurements;
32  if (!mInMeasurementFile.empty()) {
33  readMeasurements(mInMeasurementFile, measurements);
34 
35  // Check that measurements map is complete
36  for (auto task : ordered) {
37  if (measurements.find(task->toString()) == measurements.end())
38  throw SchedulingException();
39  }
40  } else {
41  // Insert constant cost for each task (HLFNET)
42  for (auto task : ordered) {
43  measurements[task->toString()] = 1;
44  }
45  }
46 
47  // HLFET
48  for (auto it = ordered.rbegin(); it != ordered.rend(); ++it) {
49  auto task = *it;
50  int64_t maxLevel = 0;
51  if (outEdges.find(task) != outEdges.end()) {
52  for (auto dep : outEdges.at(task)) {
53  if (priorities[dep] > maxLevel) {
54  maxLevel = priorities[dep];
55  }
56  }
57  }
58  priorities[task] = measurements.at(task->toString()) + maxLevel;
59  }
60 
61  auto cmp = [&priorities](const Task::Ptr &p1, const Task::Ptr &p2) -> bool {
62  return priorities[p1] < priorities[p2];
63  };
64  std::priority_queue<Task::Ptr, std::deque<Task::Ptr>, decltype(cmp)> queue(
65  cmp);
66  for (auto task : ordered) {
67  if (inEdges.find(task) == inEdges.end() || inEdges.at(task).empty()) {
68  queue.push(task);
69  } else {
70  break;
71  }
72  }
73 
74  std::vector<TaskTime::rep> totalTimes(mNumThreads, 0);
75  Edges inEdgesCpy = inEdges;
76  while (!queue.empty()) {
77  auto task = queue.top();
78  queue.pop();
79 
80  auto minIt = std::min_element(totalTimes.begin(), totalTimes.end());
81  Int minIdx = static_cast<UInt>(minIt - totalTimes.begin());
82  scheduleTask(minIdx, task);
83  totalTimes[minIdx] += measurements.at(task->toString());
84 
85  if (outEdges.find(task) != outEdges.end()) {
86  for (auto after : outEdges.at(task)) {
87  for (auto edgeIt = inEdgesCpy[after].begin();
88  edgeIt != inEdgesCpy[after].end(); ++edgeIt) {
89  if (*edgeIt == task) {
90  inEdgesCpy[after].erase(edgeIt);
91  break;
92  }
93  }
94  if (inEdgesCpy[after].empty() &&
95  std::find(ordered.begin(), ordered.end(), after) != ordered.end()) {
96  queue.push(after);
97  }
98  }
99  }
100  }
101 
102  ThreadScheduler::finishSchedule(inEdges);
103 }
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
Definition: Scheduler.cpp:120
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition: Scheduler.cpp:45
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition: Scheduler.h:31