9 #include <dpsim/ThreadLevelScheduler.h>
16 using namespace DPsim;
18 ThreadLevelScheduler::ThreadLevelScheduler(Int threads,
19 String outMeasurementFile,
20 String inMeasurementFile,
21 Bool useConditionVariable,
24 mInMeasurementFile(inMeasurementFile), mSortTaskTypes(sortTaskTypes) {}
26 void ThreadLevelScheduler::createSchedule(
const Task::List &tasks,
28 const Edges &outEdges) {
30 std::vector<Task::List> levels;
33 Scheduler::initMeasurements(ordered);
37 if (!mInMeasurementFile.empty()) {
38 std::unordered_map<String, TaskTime::rep> measurements;
40 for (
size_t level = 0; level < levels.size(); level++) {
42 scheduleLevel(levels[level], measurements, inEdges);
45 for (
size_t level = 0; level < levels.size(); level++) {
47 sortTasksByType(levels[level].begin(), levels[level].end());
49 for (Int thread = 0; thread < mNumThreads; ++thread) {
51 static_cast<Int
>(levels[level].size()) * thread / mNumThreads;
53 static_cast<Int
>(levels[level].size()) * (thread + 1) / mNumThreads;
54 for (
int idx = start; idx != end; idx++)
55 scheduleTask(thread, levels[level][idx]);
60 ThreadScheduler::finishSchedule(inEdges);
63 void ThreadLevelScheduler::sortTasksByType(Task::List::iterator begin,
64 CPS::Task::List::iterator end) {
65 auto cmp = [](
const Task::Ptr &p1,
const Task::Ptr &p2) ->
bool {
70 #pragma clang diagnostic push
71 #pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
73 return typeid(*p1).before(
typeid(*p2));
75 #pragma clang diagnostic pop
78 std::sort(begin, end, cmp);
81 void ThreadLevelScheduler::scheduleLevel(
82 const Task::List &tasks,
83 const std::unordered_map<String, TaskTime::rep> &measurements,
84 const Edges &inEdges) {
85 Task::List tasksSorted = tasks;
88 for (
auto task : tasks) {
89 if (measurements.find(task->toString()) == measurements.end())
94 TaskTime::rep totalTime = 0;
95 for (
auto task : tasks) {
96 totalTime += measurements.at(task->toString());
99 TaskTime::rep avgTime = totalTime / mNumThreads;
104 sortTasksByType(tasksSorted.begin(), tasksSorted.end());
106 for (
int thread = 0; thread < mNumThreads; thread++) {
107 TaskTime::rep curTime = 0;
108 while (curTime < avgTime && task < tasksSorted.size()) {
109 scheduleTask(thread, tasksSorted[task]);
110 curTime += measurements.at(tasksSorted[task]->toString());
116 for (; task < tasksSorted.size(); task++)
117 scheduleTask(mNumThreads - 1, tasksSorted[task]);
120 auto cmp = [&measurements](
const Task::Ptr &p1,
121 const Task::Ptr &p2) ->
bool {
122 return measurements.at(p1->toString()) > measurements.at(p2->toString());
124 std::sort(tasksSorted.begin(), tasksSorted.end(), cmp);
127 std::vector<TaskTime::rep> totalTimes(mNumThreads, 0);
128 for (
auto task : tasksSorted) {
129 auto minIt = std::min_element(totalTimes.begin(), totalTimes.end());
130 Int minIdx =
static_cast<UInt
>(minIt - totalTimes.begin());
131 scheduleTask(minIdx, task);
132 totalTimes[minIdx] += measurements.at(task->toString());
void topologicalSort(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, CPS::Task::List &sortedTasks)
Simple topological sort, filtering out tasks that do not need to be executed.
static void levelSchedule(const CPS::Task::List &tasks, const Edges &inEdges, const Edges &outEdges, std::vector< CPS::Task::List > &levels)
void readMeasurements(CPS::String filename, std::unordered_map< CPS::String, TaskTime::rep > &measurements)
Read measurement data from file to use it for the scheduling.
std::unordered_map< CPS::Task::Ptr, std::deque< CPS::Task::Ptr > > Edges