ORIGINAL
Loading...
Searching...
No Matches
tasks.h
Go to the documentation of this file.
1
26#ifndef ORIGINAL_TASKS_H
27#define ORIGINAL_TASKS_H
28
29#include "async.h"
30#include "atomic.h"
31#include "queue.h"
32#include "refCntPtr.h"
33#include "array.h"
34#include "prique.h"
35#include "vector.h"
36
37namespace original {
38
39 // ==================== Task Delegator (Thread Pool) ====================
40
50 public:
51 // ==================== Task Base Interface ====================
52
60 class taskBase {
61 public:
65 virtual void run() = 0;
66
70 virtual ~taskBase() = default;
71 };
72
73 // ==================== Concrete Task Class ====================
74
82 template<typename TYPE>
83 class task final : public taskBase {
84 async::promise<TYPE, std::function<TYPE()>> p;
85
86 public:
87 task(const task&) = delete;
88 task& operator=(const task&) = delete;
89 task(task&&) = default;
90 task& operator=(task&&) = default;
91 task() = default;
92
100 template<typename Callback, typename... Args>
101 explicit task(Callback&& c, Args&&... args);
102
106 void run() override;
107
113 };
114
115 // ==================== Task Priorities ====================
116
121 enum class priority : u_integer {
122 IMMEDIATE = 0,
123 HIGH = 1,
124 NORMAL = 2,
125 LOW = 3,
126 DEFERRED = 4,
127 };
128
133 enum class stopMode {
134 DISCARD_DEFERRED,
135 KEEP_DEFERRED,
136 RUN_DEFERRED,
137 };
138
139 // Convenience constants
140 static constexpr auto IMMEDIATE = priority::IMMEDIATE;
141 static constexpr auto HIGH = priority::HIGH;
142 static constexpr auto NORMAL = priority::NORMAL;
143 static constexpr auto LOW = priority::LOW;
144 static constexpr auto DEFERRED = priority::DEFERRED;
145
146 static constexpr auto DISCARD_DEFERRED = stopMode::DISCARD_DEFERRED;
147 static constexpr auto KEEP_DEFERRED = stopMode::KEEP_DEFERRED;
148 static constexpr auto RUN_DEFERRED = stopMode::RUN_DEFERRED;
149
150 private:
151 // Internal type definitions
152 using priorityTask = couple<strongPtr<taskBase>, priority>;
153
158 template<typename COUPLE>
159 struct taskComparator {
166 bool operator()(const COUPLE& lhs, const COUPLE& rhs) const;
167 };
168
169 using priorityTaskQueue = prique<priorityTask, taskComparator, vector>;
170
171 array<thread> threads_;
172 priorityTaskQueue tasks_waiting_;
173 queue<strongPtr<taskBase>> task_immediate_;
174 queue<strongPtr<taskBase>> tasks_deferred_;
175 mutable condition condition_;
176 mutable mutex mutex_;
177 bool stopped_;
178 u_integer active_threads_;
179 u_integer idle_threads_;
180
188 template<typename TYPE>
189 async::future<TYPE> submit(priority priority, strongPtr<task<TYPE>>& t);
190
191 public:
192 taskDelegator(const taskDelegator&) = delete;
196
201 explicit taskDelegator(u_integer thread_cnt = 8);
202
211 template<typename Callback, typename... Args>
212 auto submit(Callback&& c, Args&&... args);
213
226 template<typename Callback, typename... Args>
227 auto submit(priority priority, Callback&& c, Args&&... args);
228
244 template<typename Callback, typename... Args>
245 auto submit(time::duration timeout, Callback&& c, Args&&... args);
246
251
256
260 void runDeferred();
261
265 void runAllDeferred();
266
272
276 void discardAllDeferred();
277
282
291 void stop(stopMode mode = stopMode::KEEP_DEFERRED);
292
298
304
309 ~taskDelegator();
310 };
311} // namespace original
312
313// ==================== Task Implementation ====================
314
318 : p([c = std::forward<Callback>(c), ...args = std::forward<Args>(args)]() mutable {
319 return c(args...);
320 }) {}
321
322template <typename TYPE>
324{
325 this->p.run();
326}
327
328template <typename TYPE>
333
334// ==================== Task Delegator Implementation ====================
335
336template <typename COUPLE>
337bool original::taskDelegator::taskComparator<COUPLE>::operator()(const COUPLE& lhs, const COUPLE& rhs) const
338{
339 return static_cast<u_integer>(lhs.second()) < static_cast<u_integer>(rhs.second());
340}
341
343 : threads_(thread_cnt),
344 stopped_(false),
345 active_threads_(0),
346 idle_threads_(0) {
347 for (auto& thread_ : this->threads_) {
348 thread_ = thread {
349 [this]{
350 while (true) {
352 {
353 uniqueLock lock(this->mutex_);
354 this->idle_threads_ += 1;
355 this->condition_.wait(this->mutex_, [this] {
356 return this->stopped_ || !this->tasks_waiting_.empty() || !this->task_immediate_.empty();
357 });
358
359 if (this->stopped_ &&
360 this->tasks_waiting_.empty() &&
361 this->task_immediate_.empty()) {
362 this->idle_threads_ -= 1;
363 return;
364 }
365
366 if (!this->task_immediate_.empty()) {
367 task = std::move(this->task_immediate_.pop());
368 } else {
369 task = std::move(this->tasks_waiting_.pop().first());
370 }
371 this->idle_threads_ -= 1;
372 }
373
374 {
375 uniqueLock lock(this->mutex_);
376 this->active_threads_ += 1;
377 }
378 task->run();
379 {
380 uniqueLock lock(this->mutex_);
381 this->active_threads_ -= 1;
382 }
383 }
384 }
385 };
386 }
387}
388
389template <typename Callback, typename ... Args>
390auto original::taskDelegator::submit(Callback&& c, Args&&... args)
391{
392 return this->submit(priority::NORMAL, std::forward<Callback>(c), std::forward<Args>(args)...);
393}
394
395template <typename Callback, typename ... Args>
396auto original::taskDelegator::submit(const priority priority, Callback&& c, Args&&... args)
397{
398 using ReturnType = decltype(c(args...));
400 std::forward<Callback>(c),
401 std::forward<Args>(args)...
402 );
403 return this->submit<ReturnType>(priority, new_task);
404}
405
406template <typename Callback, typename ... Args>
407auto original::taskDelegator::submit(time::duration timeout, Callback&& c, Args&&... args)
408{
409 using ReturnType = decltype(c(args...));
411 std::forward<Callback>(c),
412 std::forward<Args>(args)...
413 );
414 auto f = new_task->getFuture();
415 {
416 uniqueLock lock(this->mutex_);
417 if (this->stopped_) {
418 throw sysError("taskDelegator already stopped");
419 }
420 const bool success = this->condition_.waitFor(this->mutex_, timeout, [this]{
421 return this->idle_threads_ > 0;
422 });
423 if (!success) {
424 throw sysError("No idle threads available within timeout");
425 }
426 this->task_immediate_.push(std::move(new_task.template dynamicCastTo<taskBase>()));
427 }
428 this->condition_.notify();
429 return f;
430}
431
433{
434 uniqueLock lock(this->mutex_);
435 return this->tasks_waiting_.size();
436}
437
439{
440 uniqueLock lock(this->mutex_);
441 return this->task_immediate_.size();
442}
443
444template <typename TYPE>
446original::taskDelegator::submit(const priority priority, strongPtr<task<TYPE>>& t)
447{
448 auto f = t->getFuture();
449 {
450 uniqueLock lock(this->mutex_);
451 if (this->stopped_) {
452 throw sysError("taskDelegator already stopped");
453 }
454 switch (priority) {
455 case priority::IMMEDIATE:
456 if (this->idle_threads_ == 0) {
457 throw sysError("No idle threads now");
458 }
459 this->task_immediate_.push(std::move(t.template dynamicCastTo<taskBase>()));
460 break;
461 case priority::HIGH:
462 case priority::NORMAL:
463 case priority::LOW:
464 this->tasks_waiting_.push(priorityTask{t.template dynamicCastTo<taskBase>(), priority});
465 break;
466 case priority::DEFERRED:
467 this->tasks_deferred_.push(t.template dynamicCastTo<taskBase>());
468 return f;
469 default:
470 throw sysError("Unknown priority");
471 }
472 }
473 this->condition_.notify();
474 return f;
475}
476
478{
479 {
480 uniqueLock lock(this->mutex_);
481 if (!this->tasks_deferred_.empty()) {
482 this->tasks_waiting_.push(priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
483 } else {
484 return;
485 }
486 }
487 this->condition_.notify();
488}
489
491{
492 {
493 uniqueLock lock(this->mutex_);
494 if (this->tasks_deferred_.empty()) {
495 return;
496 }
497 while (!this->tasks_deferred_.empty()) {
498 this->tasks_waiting_.push(priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
499 }
500 }
501 this->condition_.notifyAll();
502}
503
505{
506 uniqueLock lock(this->mutex_);
507 if (!this->tasks_deferred_.empty()) {
508 this->tasks_deferred_.pop();
509 }
510 return this->tasks_deferred_.size();
511}
512
514{
515 uniqueLock lock(this->mutex_);
516 if (!this->tasks_deferred_.empty()) {
517 this->tasks_deferred_.clear();
518 }
519}
520
522{
523 uniqueLock lock(this->mutex_);
524 return this->tasks_deferred_.size();
525}
526
528{
529 {
530 uniqueLock lock(this->mutex_);
531 switch (mode) {
532 case RUN_DEFERRED:
533 while (!this->tasks_deferred_.empty()) {
534 this->tasks_waiting_.push(priorityTask{this->tasks_deferred_.pop(), DEFERRED});
535 }
536 break;
537 case DISCARD_DEFERRED:
538 this->tasks_deferred_.clear();
539 break;
540 case KEEP_DEFERRED:
541 break;
542 default:
543 throw sysError("Unknown stop mode");
544 }
545 this->stopped_ = true;
546 }
547 this->condition_.notifyAll();
548}
549
551{
552 uniqueLock lock(this->mutex_);
553 return this->active_threads_;
554}
555
557{
558 uniqueLock lock(this->mutex_);
559 return this->idle_threads_;
560}
561
563{
564 this->stop(stopMode::RUN_DEFERRED);
565 for (auto& thread : threads_) {
566 if (thread.joinable())
567 thread.join();
568 }
569}
570
571#endif //ORIGINAL_TASKS_H
Provides the array class for a fixed-size container with random access.
Represents a one-shot future result of an asynchronous computation.
Definition async.h:165
Represents a one-time asynchronous producer with result setting capability.
Definition async.h:349
Unique ownership smart pointer with move semantics.
Definition ownerPtr.h:37
Shared ownership smart pointer with strong references.
Definition refCntPtr.h:108
Exception for generic system failure.
Definition error.h:413
Abstract base class for all tasks.
Definition tasks.h:60
virtual ~taskBase()=default
Virtual destructor for proper polymorphic behavior.
virtual void run()=0
Executes the task.
Concrete task implementation with future/promise support.
Definition tasks.h:83
async::future< TYPE > getFuture()
Gets the future associated with this task.
Definition tasks.h:329
task & operator=(task &&)=default
Allow move assignment.
task(const task &)=delete
Disable copy constructor.
void run() override
Executes the task.
Definition tasks.h:323
task(task &&)=default
Allow move constructor.
task & operator=(const task &)=delete
Disable copy assignment.
Thread pool for managing and executing prioritized tasks.
Definition tasks.h:49
stopMode
Stop behavior for deferred tasks.
Definition tasks.h:133
@ DISCARD_DEFERRED
Discard deferred tasks.
@ RUN_DEFERRED
Execute all deferred tasks before stopping.
@ KEEP_DEFERRED
Keep deferred tasks.
void discardAllDeferred()
Discards all deferred tasks.
Definition tasks.h:513
void runAllDeferred()
Activates all deferred tasks.
Definition tasks.h:490
u_integer immediateCnt() const noexcept
Returns the number of immediate tasks pending execution.
Definition tasks.h:438
priority
Task priority levels for execution scheduling.
Definition tasks.h:121
@ NORMAL
Normal priority task (default)
@ IMMEDIATE
Execute immediately if threads available.
@ DEFERRED
Deferred execution.
u_integer discardDeferred()
Discards one deferred task.
Definition tasks.h:504
u_integer idleThreads() const noexcept
Gets the number of idle threads.
Definition tasks.h:556
u_integer activeThreads() const noexcept
Gets the number of active threads.
Definition tasks.h:550
taskDelegator & operator=(taskDelegator &&)=delete
Disable move assignment.
taskDelegator(const taskDelegator &)=delete
Disable copy constructor.
u_integer deferredCnt() const noexcept
Returns number of deferred tasks.
Definition tasks.h:521
taskDelegator & operator=(const taskDelegator &)=delete
Disable copy assignment.
taskDelegator(taskDelegator &&)=delete
Disable move constructor.
~taskDelegator()
Destructor.
Definition tasks.h:562
void stop(stopMode mode=stopMode::KEEP_DEFERRED)
Stops the task delegator.
Definition tasks.h:527
void runDeferred()
Activates one deferred task.
Definition tasks.h:477
u_integer waitingCnt() const noexcept
Returns the number of waiting (non-immediate, non-deferred) tasks.
Definition tasks.h:432
High-level thread wrapper.
Definition thread.h:307
void join() override
Wait for thread to complete.
Definition thread.h:826
bool joinable() const override
Check if thread is joinable.
Definition thread.h:850
Represents a time duration with nanosecond precision.
Definition zeit.h:143
RAII wrapper for single mutex locking.
Definition mutex.h:280
Main namespace for the project Original.
Definition algorithms.h:21
Standard namespace extensions for original::alternative.
Definition allocator.h:351
Priority queue container implementation.
Queue container adapter implementation.
Reference-counted smart pointer hierarchy.
Dynamic array container with automatic resizing.