26#ifndef ORIGINAL_TASKS_H
27#define ORIGINAL_TASKS_H
64 virtual void run() = 0;
69 virtual ~taskBase() =
default;
81 template<
typename TYPE>
82 class task final :
public taskBase {
86 task(
const task&) =
delete;
87 task& operator=(
const task&) =
delete;
88 task(task&&) =
default;
89 task& operator=(task&&) =
default;
99 template<
typename Callback,
typename... Args>
100 explicit task(Callback&& c, Args&&... args);
158 template<
typename COUPLE>
159 struct taskComparator {
166 bool operator()(
const COUPLE& lhs,
const COUPLE& rhs)
const;
169 using priorityTaskQueue = prique<priorityTask, taskComparator, vector>;
171 array<thread> threads_;
172 priorityTaskQueue tasks_waiting_;
173 queue<strongPtr<taskBase>> task_immediate_;
174 queue<strongPtr<taskBase>> tasks_deferred_;
175 mutable pCondition condition_;
176 mutable pMutex mutex_;
188 template<
typename TYPE>
211 template<
typename Callback,
typename... Args>
212 auto submit(Callback&& c, Args&&... args);
226 template<
typename Callback,
typename... Args>
244 template<
typename Callback,
typename... Args>
245 auto submit(
time::duration timeout, Callback&& c, Args&&... args);
315template <typename TYPE>
316template <typename Callback, typename... Args>
318 : p([c = std::forward<Callback>(c), ...args = std::forward<Args>(args)]() mutable {
322template <
typename TYPE>
323void original::taskDelegator::task<TYPE>::run()
328template <
typename TYPE>
331 return this->p.getFuture();
336template <
typename COUPLE>
337bool original::taskDelegator::taskComparator<COUPLE>::operator()(
const COUPLE& lhs,
const COUPLE& rhs)
const
343 : threads_(thread_cnt),
347 for (
auto& thread_ : this->threads_) {
354 this->idle_threads_ += 1;
355 this->condition_.wait(this->mutex_, [
this] {
356 return this->stopped_ || !this->tasks_waiting_.empty() || !this->task_immediate_.empty();
359 if (this->stopped_ &&
360 this->tasks_waiting_.empty() &&
361 this->task_immediate_.empty()) {
362 this->idle_threads_ -= 1;
366 if (!this->task_immediate_.empty()) {
367 task = std::move(this->task_immediate_.pop());
369 task = std::move(this->tasks_waiting_.pop().first());
371 this->idle_threads_ -= 1;
376 this->active_threads_ += 1;
381 this->active_threads_ -= 1;
389template <
typename Callback,
typename ... Args>
390auto original::taskDelegator::submit(Callback&& c, Args&&... args)
392 return this->submit(priority::NORMAL, std::forward<Callback>(c), std::forward<Args>(args)...);
395template <
typename Callback,
typename ... Args>
398 using ReturnType =
decltype(c(args...));
400 std::forward<Callback>(c),
401 std::forward<Args>(args)...
403 return this->submit<ReturnType>(
priority, new_task);
406template <
typename Callback,
typename ... Args>
407auto original::taskDelegator::submit(
time::duration timeout, Callback&& c, Args&&... args)
409 using ReturnType =
decltype(c(args...));
411 std::forward<Callback>(c),
412 std::forward<Args>(args)...
414 auto f = new_task->getFuture();
417 if (this->stopped_) {
418 throw sysError(
"taskDelegator already stopped");
420 const bool success = this->condition_.waitFor(this->mutex_, timeout, [
this]{
421 return this->idle_threads_ > 0;
424 throw sysError(
"No idle threads available within timeout");
426 this->task_immediate_.push(std::move(new_task.template dynamicCastTo<taskBase>()));
428 this->condition_.notify();
435 return this->tasks_waiting_.size();
441 return this->task_immediate_.size();
444template <
typename TYPE>
446original::taskDelegator::submit(
const priority priority,
strongPtr<task<TYPE>>& t)
448 auto f = t->getFuture();
451 if (this->stopped_) {
452 throw sysError(
"taskDelegator already stopped");
455 case priority::IMMEDIATE:
456 if (this->idle_threads_ == 0) {
457 throw sysError(
"No idle threads now");
459 this->task_immediate_.push(std::move(t.template dynamicCastTo<taskBase>()));
462 case priority::NORMAL:
464 this->tasks_waiting_.push(priorityTask{t.template dynamicCastTo<taskBase>(), priority});
466 case priority::DEFERRED:
467 this->tasks_deferred_.push(t.template dynamicCastTo<taskBase>());
470 throw sysError(
"Unknown priority");
473 this->condition_.notify();
481 if (!this->tasks_deferred_.empty()) {
482 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
487 this->condition_.notify();
494 if (this->tasks_deferred_.empty()) {
497 while (!this->tasks_deferred_.empty()) {
498 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
501 this->condition_.notifyAll();
507 if (!this->tasks_deferred_.empty()) {
508 this->tasks_deferred_.pop();
510 return this->tasks_deferred_.size();
516 if (!this->tasks_deferred_.empty()) {
517 this->tasks_deferred_.clear();
524 return this->tasks_deferred_.size();
533 while (!this->tasks_deferred_.empty()) {
534 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), DEFERRED});
537 case DISCARD_DEFERRED:
538 this->tasks_deferred_.clear();
543 throw sysError(
"Unknown stop mode");
545 this->stopped_ =
true;
547 this->condition_.notifyAll();
553 return this->active_threads_;
559 return this->idle_threads_;
564 this->stop(stopMode::RUN_DEFERRED);
565 for (
auto&
thread : threads_) {
Provides the array class for a fixed-size container with random access.
Represents a one-shot future result of an asynchronous computation.
Definition async.h:165
Represents a one-time asynchronous producer with result setting capability.
Definition async.h:349
Container for two heterogeneous elements.
Definition couple.h:37
Shared ownership smart pointer with strong references.
Definition refCntPtr.h:108
Exception for generic system failure.
Definition error.h:306
Thread pool for managing and executing prioritized tasks.
Definition tasks.h:49
stopMode
Stop behavior for deferred tasks.
Definition tasks.h:133
@ DISCARD_DEFERRED
Discard deferred tasks.
@ RUN_DEFERRED
Execute all deferred tasks before stopping.
@ KEEP_DEFERRED
Keep deferred tasks.
void discardAllDeferred()
Discards all deferred tasks.
Definition tasks.h:513
void runAllDeferred()
Activates all deferred tasks.
Definition tasks.h:490
u_integer immediateCnt() const noexcept
Returns the number of immediate tasks pending execution.
Definition tasks.h:438
priority
Task priority levels for execution scheduling.
Definition tasks.h:121
@ NORMAL
Normal priority task (default)
@ HIGH
High priority task.
@ IMMEDIATE
Execute immediately if threads available.
@ DEFERRED
Deferred execution.
u_integer discardDeferred()
Discards one deferred task.
Definition tasks.h:504
u_integer idleThreads() const noexcept
Gets the number of idle threads.
Definition tasks.h:556
u_integer activeThreads() const noexcept
Gets the number of active threads.
Definition tasks.h:550
taskDelegator & operator=(taskDelegator &&)=delete
Disable move assignment.
taskDelegator(const taskDelegator &)=delete
Disable copy constructor.
u_integer deferredCnt() const noexcept
Returns number of deferred tasks.
Definition tasks.h:521
taskDelegator & operator=(const taskDelegator &)=delete
Disable copy assignment.
taskDelegator(taskDelegator &&)=delete
Disable move constructor.
~taskDelegator()
Destructor.
Definition tasks.h:562
void stop(stopMode mode=stopMode::KEEP_DEFERRED)
Stops the task delegator.
Definition tasks.h:527
void runDeferred()
Activates one deferred task.
Definition tasks.h:477
u_integer waitingCnt() const noexcept
Returns the number of waiting (non-immediate, non-deferred) tasks.
Definition tasks.h:432
High-level thread wrapper.
Definition thread.h:263
void join() override
Wait for thread to complete.
Definition thread.h:656
bool joinable() const override
Check if thread is joinable.
Definition thread.h:680
Represents a time duration with nanosecond precision.
Definition zeit.h:134
RAII wrapper for single mutex locking.
Definition mutex.h:216
std::uint32_t u_integer
32-bit unsigned integer type for sizes and indexes
Definition config.h:263
Main namespace for the project Original.
Definition algorithms.h:21
Priority queue container implementation.
Queue container adapter implementation.
Reference-counted smart pointer hierarchy.
Dynamic array container with automatic resizing.