DPsim
Loading...
Searching...
No Matches
ThreadLevelScheduler.cpp
1/* Copyright 2017-2021 Institute for Automation of Complex Power Systems,
2 * EONERC, RWTH Aachen University
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
7 *********************************************************************************/
8
9#include <dpsim/ThreadLevelScheduler.h>
10
11#include <algorithm>
12#include <iostream>
13#include <typeinfo>
14
15using namespace CPS;
16using namespace DPsim;
17
18ThreadLevelScheduler::ThreadLevelScheduler(Int threads,
19 String outMeasurementFile,
20 String inMeasurementFile,
21 Bool useConditionVariable,
22 Bool sortTaskTypes)
23 : ThreadScheduler(threads, outMeasurementFile, useConditionVariable),
24 mInMeasurementFile(inMeasurementFile), mSortTaskTypes(sortTaskTypes) {}
25
26void ThreadLevelScheduler::createSchedule(const Task::List &tasks,
27 const Edges &inEdges,
28 const Edges &outEdges) {
29 Task::List ordered;
30 std::vector<Task::List> levels;
31
32 Scheduler::topologicalSort(tasks, inEdges, outEdges, ordered);
33 Scheduler::initMeasurements(ordered);
34
35 Scheduler::levelSchedule(ordered, inEdges, outEdges, levels);
36
37 if (!mInMeasurementFile.empty()) {
38 std::unordered_map<String, TaskTime::rep> measurements;
39 readMeasurements(mInMeasurementFile, measurements);
40 for (size_t level = 0; level < levels.size(); level++) {
41 // Distribute tasks such that the execution time is (approximately) minimized
42 scheduleLevel(levels[level], measurements, inEdges);
43 }
44 } else {
45 for (size_t level = 0; level < levels.size(); level++) {
46 if (mSortTaskTypes)
47 sortTasksByType(levels[level].begin(), levels[level].end());
48 // Distribute tasks of one level evenly between threads
49 for (Int thread = 0; thread < mNumThreads; ++thread) {
50 Int start =
51 static_cast<Int>(levels[level].size()) * thread / mNumThreads;
52 Int end =
53 static_cast<Int>(levels[level].size()) * (thread + 1) / mNumThreads;
54 for (int idx = start; idx != end; idx++)
55 scheduleTask(thread, levels[level][idx]);
56 }
57 }
58 }
59
60 ThreadScheduler::finishSchedule(inEdges);
61}
62
63void ThreadLevelScheduler::sortTasksByType(Task::List::iterator begin,
64 CPS::Task::List::iterator end) {
65 auto cmp = [](const Task::Ptr &p1, const Task::Ptr &p2) -> bool {
66 // TODO: according to the standard, the ordering may change between invocations
67 // clang complains here for some reason that the expressions in the typeid
68 // might be evaluated (which is the whole point)
69#ifdef __clang__
70#pragma clang diagnostic push
71#pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
72#endif
73 return typeid(*p1).before(typeid(*p2));
74#ifdef __clang__
75#pragma clang diagnostic pop
76#endif
77 };
78 std::sort(begin, end, cmp);
79}
80
81void ThreadLevelScheduler::scheduleLevel(
82 const Task::List &tasks,
83 const std::unordered_map<String, TaskTime::rep> &measurements,
84 const Edges &inEdges) {
85 Task::List tasksSorted = tasks;
86
87 // Check that measurements map is complete
88 for (auto task : tasks) {
89 if (measurements.find(task->toString()) == measurements.end())
90 throw SchedulingException();
91 }
92
93 if (mSortTaskTypes) {
94 TaskTime::rep totalTime = 0;
95 for (auto task : tasks) {
96 totalTime += measurements.at(task->toString());
97 }
98
99 TaskTime::rep avgTime = totalTime / mNumThreads;
100
101 // Sort the tasks by type and then push them to the threads in order
102 // while aiming for approximately equal execution time.
103 // Should work well enough for a large enough number of tasks
104 sortTasksByType(tasksSorted.begin(), tasksSorted.end());
105 size_t task = 0;
106 for (int thread = 0; thread < mNumThreads; thread++) {
107 TaskTime::rep curTime = 0;
108 while (curTime < avgTime && task < tasksSorted.size()) {
109 scheduleTask(thread, tasksSorted[task]);
110 curTime += measurements.at(tasksSorted[task]->toString());
111 task++;
112 }
113 }
114 // All tasks should be distributed, but just to be sure, put the remaining
115 // ones to the last thread
116 for (; task < tasksSorted.size(); task++)
117 scheduleTask(mNumThreads - 1, tasksSorted[task]);
118 } else {
119 // Sort tasks in descending execution time
120 auto cmp = [&measurements](const Task::Ptr &p1,
121 const Task::Ptr &p2) -> bool {
122 return measurements.at(p1->toString()) > measurements.at(p2->toString());
123 };
124 std::sort(tasksSorted.begin(), tasksSorted.end(), cmp);
125
126 // Greedy heuristic: schedule the tasks to the thread with the smallest current execution time
127 std::vector<TaskTime::rep> totalTimes(mNumThreads, 0);
128 for (auto task : tasksSorted) {
129 auto minIt = std::min_element(totalTimes.begin(), totalTimes.end());
130 Int minIdx = static_cast<UInt>(minIt - totalTimes.begin());
131 scheduleTask(minIdx, task);
132 totalTimes[minIdx] += measurements.at(task->toString());
133 }
134 }
135}
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
Definition Scheduler.cpp:45
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges
Definition Scheduler.h:31
void createSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges)
Creates the schedule for the given dependency graph.