DPsim
ThreadLevelScheduler.cpp
1 /* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2  * EONERC, RWTH Aachen University
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7  *********************************************************************************/
8 
9 #include <dpsim/ThreadLevelScheduler.h>
10 
11 #include <algorithm>
12 #include <iostream>
13 #include <typeinfo>
14 
15 using namespace CPS;
16 using namespace DPsim;
17 
18 ThreadLevelScheduler::ThreadLevelScheduler(Int threads,
19  String outMeasurementFile,
20  String inMeasurementFile,
21  Bool useConditionVariable,
22  Bool sortTaskTypes)
23  : ThreadScheduler(threads, outMeasurementFile, useConditionVariable),
24  mInMeasurementFile(inMeasurementFile), mSortTaskTypes(sortTaskTypes) {}
25 
26 void ThreadLevelScheduler::createSchedule(const Task::List &tasks,
27  const Edges &inEdges,
28  const Edges &outEdges) {
29  Task::List ordered;
30  std::vector<Task::List> levels;
31 
32  Scheduler::topologicalSort(tasks, inEdges, outEdges, ordered);
33  Scheduler::initMeasurements(ordered);
34 
35  Scheduler::levelSchedule(ordered, inEdges, outEdges, levels);
36 
37  if (!mInMeasurementFile.empty()) {
38  std::unordered_map<String, TaskTime::rep> measurements;
39  readMeasurements(mInMeasurementFile, measurements);
40  for (size_t level = 0; level < levels.size(); level++) {
41  // Distribute tasks such that the execution time is (approximately) minimized
42  scheduleLevel(levels[level], measurements, inEdges);
43  }
44  } else {
45  for (size_t level = 0; level < levels.size(); level++) {
46  if (mSortTaskTypes)
47  sortTasksByType(levels[level].begin(), levels[level].end());
48  // Distribute tasks of one level evenly between threads
49  for (Int thread = 0; thread < mNumThreads; ++thread) {
50  Int start =
51  static_cast<Int>(levels[level].size()) * thread / mNumThreads;
52  Int end =
53  static_cast<Int>(levels[level].size()) * (thread + 1) / mNumThreads;
54  for (int idx = start; idx != end; idx++)
55  scheduleTask(thread, levels[level][idx]);
56  }
57  }
58  }
59 
60  ThreadScheduler::finishSchedule(inEdges);
61 }
62 
63 void ThreadLevelScheduler::sortTasksByType(Task::List::iterator begin,
64  CPS::Task::List::iterator end) {
65  auto cmp = [](const Task::Ptr &p1, const Task::Ptr &p2) -> bool {
66  // TODO: according to the standard, the ordering may change between invocations
67  // clang complains here for some reason that the expressions in the typeid
68  // might be evaluated (which is the whole point)
69 #ifdef __clang__
70 #pragma clang diagnostic push
71 #pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
72 #endif
73  return typeid(*p1).before(typeid(*p2));
74 #ifdef __clang__
75 #pragma clang diagnostic pop
76 #endif
77  };
78  std::sort(begin, end, cmp);
79 }
80 
81 void ThreadLevelScheduler::scheduleLevel(
82  const Task::List &tasks,
83  const std::unordered_map<String, TaskTime::rep> &measurements,
84  const Edges &inEdges) {
85  Task::List tasksSorted = tasks;
86 
87  // Check that measurements map is complete
88  for (auto task : tasks) {
89  if (measurements.find(task->toString()) == measurements.end())
90  throw SchedulingException();
91  }
92 
93  if (mSortTaskTypes) {
94  TaskTime::rep totalTime = 0;
95  for (auto task : tasks) {
96  totalTime += measurements.at(task->toString());
97  }
98 
99  TaskTime::rep avgTime = totalTime / mNumThreads;
100 
101  // Sort the tasks by type and then push them to the threads in order
102  // while aiming for approximately equal execution time.
103  // Should work well enough for a large enough number of tasks
104  sortTasksByType(tasksSorted.begin(), tasksSorted.end());
105  size_t task = 0;
106  for (int thread = 0; thread < mNumThreads; thread++) {
107  TaskTime::rep curTime = 0;
108  while (curTime < avgTime && task < tasksSorted.size()) {
109  scheduleTask(thread, tasksSorted[task]);
110  curTime += measurements.at(tasksSorted[task]->toString());
111  task++;
112  }
113  }
114  // All tasks should be distributed, but just to be sure, put the remaining
115  // ones to the last thread
116  for (; task < tasksSorted.size(); task++)
117  scheduleTask(mNumThreads - 1, tasksSorted[task]);
118  } else {
119  // Sort tasks in descending execution time
120  auto cmp = [&measurements](const Task::Ptr &p1,
121  const Task::Ptr &p2) -> bool {
122  return measurements.at(p1->toString()) > measurements.at(p2->toString());
123  };
124  std::sort(tasksSorted.begin(), tasksSorted.end(), cmp);
125 
126  // Greedy heuristic: schedule the tasks to the thread with the smallest current execution time
127  std::vector<TaskTime::rep> totalTimes(mNumThreads, 0);
128  for (auto task : tasksSorted) {
129  auto minIt = std::min_element(totalTimes.begin(), totalTimes.end());
130  Int minIdx = static_cast<UInt>(minIt - totalTimes.begin());
131  scheduleTask(minIdx, task);
132  totalTimes[minIdx] += measurements.at(task->toString());
133  }
134  }
135 }
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
Definition: Scheduler.cpp:120
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
Definition: Scheduler.cpp:190
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition: Scheduler.cpp:45
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition: Scheduler.h:31