26#ifndef ORIGINAL_TASKS_H
27#define ORIGINAL_TASKS_H
65 virtual void run() = 0;
82 template<
typename TYPE>
158 template<
typename COUPLE>
159 struct taskComparator {
169 using priorityTaskQueue = prique<priorityTask, taskComparator, vector>;
171 array<thread> threads_;
172 priorityTaskQueue tasks_waiting_;
173 queue<strongPtr<taskBase>> task_immediate_;
174 queue<strongPtr<taskBase>> tasks_deferred_;
175 mutable condition condition_;
176 mutable mutex mutex_;
178 u_integer active_threads_;
179 u_integer idle_threads_;
188 template<
typename TYPE>
322template <
typename TYPE>
328template <
typename TYPE>
331 return this->p.getFuture();
336template <
typename COUPLE>
337bool original::taskDelegator::taskComparator<COUPLE>::operator()(
const COUPLE&
lhs,
const COUPLE&
rhs)
const
347 for (
auto&
thread_ : this->threads_) {
354 this->idle_threads_ += 1;
355 this->condition_.wait(this->mutex_, [
this] {
356 return this->stopped_ || !this->tasks_waiting_.empty() || !this->task_immediate_.empty();
359 if (this->stopped_ &&
360 this->tasks_waiting_.empty() &&
361 this->task_immediate_.empty()) {
362 this->idle_threads_ -= 1;
366 if (!this->task_immediate_.empty()) {
367 task = std::move(this->task_immediate_.pop());
369 task = std::move(this->tasks_waiting_.pop().first());
371 this->idle_threads_ -= 1;
376 this->active_threads_ += 1;
381 this->active_threads_ -= 1;
392 return this->submit(priority::NORMAL, std::forward<Callback>(c), std::forward<Args>(
args)...);
398 using ReturnType =
decltype(c(
args...));
400 std::forward<Callback>(c),
401 std::forward<Args>(
args)...
409 using ReturnType =
decltype(c(
args...));
411 std::forward<Callback>(c),
412 std::forward<Args>(
args)...
417 if (this->stopped_) {
418 throw sysError(
"taskDelegator already stopped");
420 const bool success = this->condition_.waitFor(this->mutex_,
timeout, [
this]{
421 return this->idle_threads_ > 0;
424 throw sysError(
"No idle threads available within timeout");
428 this->condition_.notify();
435 return this->tasks_waiting_.size();
441 return this->task_immediate_.size();
444template <
typename TYPE>
448 auto f =
t->getFuture();
451 if (this->stopped_) {
452 throw sysError(
"taskDelegator already stopped");
455 case priority::IMMEDIATE:
456 if (this->idle_threads_ == 0) {
457 throw sysError(
"No idle threads now");
459 this->task_immediate_.push(std::move(t.template dynamicCastTo<taskBase>()));
462 case priority::NORMAL:
464 this->tasks_waiting_.push(priorityTask{t.template dynamicCastTo<taskBase>(), priority});
466 case priority::DEFERRED:
467 this->tasks_deferred_.push(t.template dynamicCastTo<taskBase>());
470 throw sysError(
"Unknown priority");
473 this->condition_.notify();
481 if (!this->tasks_deferred_.empty()) {
482 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
487 this->condition_.notify();
494 if (this->tasks_deferred_.empty()) {
497 while (!this->tasks_deferred_.empty()) {
498 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
501 this->condition_.notifyAll();
507 if (!this->tasks_deferred_.empty()) {
508 this->tasks_deferred_.pop();
510 return this->tasks_deferred_.size();
516 if (!this->tasks_deferred_.empty()) {
517 this->tasks_deferred_.clear();
524 return this->tasks_deferred_.size();
533 while (!this->tasks_deferred_.empty()) {
534 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), DEFERRED});
537 case DISCARD_DEFERRED:
538 this->tasks_deferred_.clear();
543 throw sysError(
"Unknown stop mode");
545 this->stopped_ =
true;
547 this->condition_.notifyAll();
553 return this->active_threads_;
559 return this->idle_threads_;
564 this->stop(stopMode::RUN_DEFERRED);
565 for (
auto&
thread : threads_) {
Provides the array class for a fixed-size container with random access.
Represents a one-shot future result of an asynchronous computation.
Definition async.h:165
Represents a one-time asynchronous producer with result setting capability.
Definition async.h:349
Unique ownership smart pointer with move semantics.
Definition ownerPtr.h:37
Shared ownership smart pointer with strong references.
Definition refCntPtr.h:108
Exception for generic system failure.
Definition error.h:413
Abstract base class for all tasks.
Definition tasks.h:60
virtual ~taskBase()=default
Virtual destructor for proper polymorphic behavior.
virtual void run()=0
Executes the task.
Concrete task implementation with future/promise support.
Definition tasks.h:83
async::future< TYPE > getFuture()
Gets the future associated with this task.
Definition tasks.h:329
task & operator=(task &&)=default
Allow move assignment.
task(const task &)=delete
Disable copy constructor.
void run() override
Executes the task.
Definition tasks.h:323
task(task &&)=default
Allow move constructor.
task & operator=(const task &)=delete
Disable copy assignment.
Thread pool for managing and executing prioritized tasks.
Definition tasks.h:49
stopMode
Stop behavior for deferred tasks.
Definition tasks.h:133
@ DISCARD_DEFERRED
Discard deferred tasks.
@ RUN_DEFERRED
Execute all deferred tasks before stopping.
@ KEEP_DEFERRED
Keep deferred tasks.
void discardAllDeferred()
Discards all deferred tasks.
Definition tasks.h:513
void runAllDeferred()
Activates all deferred tasks.
Definition tasks.h:490
u_integer immediateCnt() const noexcept
Returns the number of immediate tasks pending execution.
Definition tasks.h:438
priority
Task priority levels for execution scheduling.
Definition tasks.h:121
@ NORMAL
Normal priority task (default)
@ HIGH
High priority task.
@ IMMEDIATE
Execute immediately if threads available.
@ DEFERRED
Deferred execution.
u_integer discardDeferred()
Discards one deferred task.
Definition tasks.h:504
u_integer idleThreads() const noexcept
Gets the number of idle threads.
Definition tasks.h:556
u_integer activeThreads() const noexcept
Gets the number of active threads.
Definition tasks.h:550
taskDelegator & operator=(taskDelegator &&)=delete
Disable move assignment.
taskDelegator(const taskDelegator &)=delete
Disable copy constructor.
u_integer deferredCnt() const noexcept
Returns number of deferred tasks.
Definition tasks.h:521
taskDelegator & operator=(const taskDelegator &)=delete
Disable copy assignment.
taskDelegator(taskDelegator &&)=delete
Disable move constructor.
~taskDelegator()
Destructor.
Definition tasks.h:562
void stop(stopMode mode=stopMode::KEEP_DEFERRED)
Stops the task delegator.
Definition tasks.h:527
void runDeferred()
Activates one deferred task.
Definition tasks.h:477
u_integer waitingCnt() const noexcept
Returns the number of waiting (non-immediate, non-deferred) tasks.
Definition tasks.h:432
High-level thread wrapper.
Definition thread.h:307
void join() override
Wait for thread to complete.
Definition thread.h:826
bool joinable() const override
Check if thread is joinable.
Definition thread.h:850
Represents a time duration with nanosecond precision.
Definition zeit.h:143
RAII wrapper for single mutex locking.
Definition mutex.h:280
Main namespace for the project Original.
Definition algorithms.h:21
Standard namespace extensions for original::alternative.
Definition allocator.h:351
Priority queue container implementation.
Queue container adapter implementation.
Reference-counted smart pointer hierarchy.
Dynamic array container with automatic resizing.