ORIGINAL
Loading...
Searching...
No Matches
tasks.h
1#ifndef ORIGINAL_TASKS_H
2#define ORIGINAL_TASKS_H
3
4#include "async.h"
5#include "atomic.h"
6#include "queue.h"
7#include "refCntPtr.h"
8#include "array.h"
9#include "prique.h"
10#include "vector.h"
11
12namespace original {
13
14 // ==================== Task Base Interface ====================
15
21 class taskBase {
22 public:
26 virtual void run() = 0;
27
31 virtual ~taskBase() = default;
32 };
33
34 // ==================== Concrete Task Class ====================
35
42 template<typename TYPE>
43 class task final : public taskBase {
44 async::promise<TYPE, std::function<TYPE()>> p;
45
46 public:
47 // Disable copying
48 task(const task&) = delete;
49 task& operator=(const task&) = delete;
50
51 // Allow moving
52 task(task&&) = default;
53 task& operator=(task&&) = default;
54
58 task() = default;
59
67 template<typename Callback, typename... Args>
68 explicit task(Callback&& c, Args&&... args);
69
73 void run() override;
74
80 };
81
82 // ==================== Task Delegator (Thread Pool) ====================
83
91 public:
96 enum class priority : u_integer {
97 IMMEDIATE = 0,
98 HIGH = 1,
99 NORMAL = 2,
100 LOW = 3,
101 DEFERRED = 4,
102 };
103
104 // Priority constants for convenience
105 static constexpr auto IMMEDIATE = priority::IMMEDIATE;
106 static constexpr auto HIGH = priority::HIGH;
107 static constexpr auto NORMAL = priority::NORMAL;
108 static constexpr auto LOW = priority::LOW;
109 static constexpr auto DEFERRED = priority::DEFERRED;
110
111 private:
112 // Internal type definitions
113 using priorityTask = couple<strongPtr<taskBase>, priority>;
114
120 template<typename COUPLE>
121 struct taskComparator {
128 bool operator()(const COUPLE& lhs, const COUPLE& rhs) const;
129 };
130
131 using priorityTaskQueue = prique<priorityTask, taskComparator, vector>;
132
133 // Member variables
134 array<thread> threads_;
135 priorityTaskQueue tasks_waiting_;
136 queue<strongPtr<taskBase>> task_immediate_;
137 queue<strongPtr<taskBase>> tasks_deferred_;
138 mutable pCondition condition_;
139 mutable pMutex mutex_;
140 bool stopped_;
141 u_integer active_threads_;
142 u_integer idle_threads_;
143
144 public:
149 explicit taskDelegator(u_integer thread_cnt = 8);
150
159 template<typename Callback, typename... Args>
160 auto submit(Callback&& c, Args&&... args);
161
171 template<typename Callback, typename... Args>
172 auto submit(priority priority, Callback&& c, Args&&... args);
173
180 template<typename TYPE>
182
190 template<typename TYPE>
192
196 void runDeferred();
197
201 void runAllDeferred();
202
206 void stop();
207
212 u_integer activeThreads() const noexcept;
213
218 u_integer idleThreads() const noexcept;
219
223 ~taskDelegator();
224 };
225} // namespace original
226
227template <typename TYPE>
228template <typename Callback, typename... Args>
229original::task<TYPE>::task(Callback&& c, Args&&... args)
230 : p([c = std::forward<Callback>(c), ...args = std::forward<Args>(args)]() mutable {
231 return c(args...);
232 }) {}
233
234template <typename TYPE>
236{
237 this->p.run();
238}
239
240template <typename TYPE>
242{
243 return this->p.getFuture();
244}
245
246template <typename COUPLE>
247bool original::taskDelegator::taskComparator<COUPLE>::operator()(const COUPLE& lhs, const COUPLE& rhs) const
248{
249 return static_cast<u_integer>(lhs.second()) < static_cast<u_integer>(rhs.second());
250}
251
253 : threads_(thread_cnt),
254 stopped_(false),
255 active_threads_(0),
256 idle_threads_(0) {
257 for (auto& thread_ : this->threads_) {
258 thread_ = thread {
259 [this]{
260 while (true) {
262 {
263 uniqueLock lock(this->mutex_);
264 this->idle_threads_ += 1;
265 this->condition_.wait(this->mutex_, [this] {
266 return this->stopped_ || !this->tasks_waiting_.empty() || !this->task_immediate_.empty();
267 });
268
269 if (this->stopped_ &&
270 this->tasks_waiting_.empty() &&
271 this->tasks_deferred_.empty() &&
272 this->task_immediate_.empty()) {
273 this->idle_threads_ -= 1;
274 return;
275 }
276
277 if (!this->task_immediate_.empty()) {
278 task = std::move(this->task_immediate_.pop());
279 } else {
280 task = std::move(this->tasks_waiting_.pop().first());
281 }
282 this->idle_threads_ -= 1;
283 }
284
285 {
286 uniqueLock lock(this->mutex_);
287 this->active_threads_ += 1;
288 }
289 task->run();
290 {
291 uniqueLock lock(this->mutex_);
292 this->active_threads_ -= 1;
293 }
294 }
295 }
296 };
297 }
298}
299
300template <typename Callback, typename ... Args>
301auto original::taskDelegator::submit(Callback&& c, Args&&... args)
302{
303 return this->submit(priority::NORMAL, std::forward<Callback>(c), std::forward<Args>(args)...);
304}
305
306template <typename Callback, typename ... Args>
307auto original::taskDelegator::submit(const priority priority, Callback&& c, Args&&... args)
308{
309 using ReturnType = decltype(c(args...));
310 auto new_task = makeStrongPtr<task<ReturnType>>(
311 std::forward<Callback>(c),
312 std::forward<Args>(args)...
313 );
314 return this->submit<ReturnType>(priority, new_task);
315}
316
317template <typename TYPE>
319{
320 return this->submit(priority::NORMAL, t);
321}
322
323template <typename TYPE>
325original::taskDelegator::submit(const priority priority, strongPtr<task<TYPE>>& t)
326{
327 auto f = t->getFuture();
328 {
329 uniqueLock lock(this->mutex_);
330 if (this->stopped_) {
331 throw sysError("taskDelegator already stopped");
332 }
333 switch (priority) {
334 case priority::IMMEDIATE:
335 if (this->idle_threads_ == 0) {
336 throw sysError("No idle threads now");
337 }
338 this->task_immediate_.push(std::move(t.template dynamicCastTo<taskBase>()));
339 break;
340 case priority::HIGH:
341 case priority::NORMAL:
342 case priority::LOW:
343 this->tasks_waiting_.push(priorityTask{t.template dynamicCastTo<taskBase>(), priority});
344 break;
345 case priority::DEFERRED:
346 this->tasks_deferred_.push(t.template dynamicCastTo<taskBase>());
347 return f;
348 default:
349 throw sysError("Unknown priority");
350 }
351 }
352 this->condition_.notify();
353 return f;
354}
355
357{
358 {
359 uniqueLock lock(this->mutex_);
360 if (!this->tasks_deferred_.empty()) {
361 this->tasks_waiting_.push(priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
362 } else {
363 return;
364 }
365 }
366 this->condition_.notify();
367}
368
370{
371 {
372 uniqueLock lock(this->mutex_);
373 if (this->tasks_deferred_.empty()) {
374 return;
375 }
376 while (!this->tasks_deferred_.empty()) {
377 this->tasks_waiting_.push(priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
378 }
379 }
380 this->condition_.notifyAll();
381}
382
384{
385 {
386 uniqueLock lock(this->mutex_);
387 this->stopped_ = true;
388 }
389 this->condition_.notifyAll();
390
391 for (auto& thread_ : this->threads_) {
392 if (thread_.joinable()) {
393 thread_.join();
394 }
395 }
396}
397
399{
400 uniqueLock lock(this->mutex_);
401 return this->active_threads_;
402}
403
405{
406 uniqueLock lock(this->mutex_);
407 return this->idle_threads_;
408}
409
411{
412 bool stopped;
413 {
414 uniqueLock lock(this->mutex_);
415 stopped = this->stopped_;
416 }
417 if (!stopped) {
418 this->stop();
419 }
420}
421
422#endif //ORIGINAL_TASKS_H
Provides the array class for a fixed-size container with random access.
Represents a future result of an asynchronous computation.
Definition async.h:100
Represents a promise of a future result.
Definition async.h:152
Container for two heterogeneous elements.
Definition couple.h:37
Shared ownership smart pointer with strong references.
Definition refCntPtr.h:108
Abstract base class for all task types.
Definition tasks.h:21
virtual ~taskBase()=default
Virtual destructor for proper polymorphism.
virtual void run()=0
Executes the task.
Thread pool for managing and executing tasks with priority.
Definition tasks.h:90
async::future< TYPE > submit(strongPtr< task< TYPE > > &t)
Submits a pre-created task with normal priority.
void stop()
Stops the task delegator and waits for completion.
Definition tasks.h:383
void runAllDeferred()
Moves all deferred tasks to the waiting queue.
Definition tasks.h:369
priority
Task priority levels for execution scheduling.
Definition tasks.h:96
@ NORMAL
Normal priority task (default)
@ IMMEDIATE
Execute immediately if threads available.
@ DEFERRED
Deferred execution (manual activation)
u_integer idleThreads() const noexcept
Gets the number of idle threads.
Definition tasks.h:404
u_integer activeThreads() const noexcept
Gets the number of active threads.
Definition tasks.h:398
auto submit(Callback &&c, Args &&... args)
Submits a task with normal priority.
Definition tasks.h:301
async::future< TYPE > submit(priority priority, strongPtr< task< TYPE > > &t)
Submits a pre-created task with specified priority.
taskDelegator(u_integer thread_cnt=8)
Constructs a task delegator with specified thread count.
Definition tasks.h:252
~taskDelegator()
Destructor - automatically stops if not already stopped.
Definition tasks.h:410
void runDeferred()
Moves one deferred task to the waiting queue.
Definition tasks.h:356
Concrete task implementation with result type.
Definition tasks.h:43
void run() override
Executes the task computation.
Definition tasks.h:235
async::future< TYPE > getFuture()
Gets the future associated with this task.
Definition tasks.h:241
task()=default
Default constructor.
High-level thread wrapper.
Definition thread.h:263
RAII wrapper for single mutex locking.
Definition mutex.h:216
std::uint32_t u_integer
32-bit unsigned integer type for sizes and indexes
Definition config.h:263
Main namespace for the project Original.
Definition algorithms.h:21
strongPtr< T, DEL > makeStrongPtr(Args &&... args)
Creates a new strongPtr managing a shared object.
Definition refCntPtr.h:633
Priority queue container implementation.
Queue container adapter implementation.
Reference-counted smart pointer hierarchy.
Dynamic array container with automatic resizing.