1#ifndef ORIGINAL_TASKS_H
2#define ORIGINAL_TASKS_H
26 virtual void run() = 0;
42 template<
typename TYPE>
49 task& operator=(
const task&) =
delete;
67 template<
typename Callback,
typename... Args>
68 explicit task(Callback&& c, Args&&... args);
120 template<
typename COUPLE>
121 struct taskComparator {
128 bool operator()(
const COUPLE& lhs,
const COUPLE& rhs)
const;
131 using priorityTaskQueue = prique<priorityTask, taskComparator, vector>;
134 array<thread> threads_;
135 priorityTaskQueue tasks_waiting_;
136 queue<strongPtr<taskBase>> task_immediate_;
137 queue<strongPtr<taskBase>> tasks_deferred_;
138 mutable pCondition condition_;
139 mutable pMutex mutex_;
159 template<
typename Callback,
typename... Args>
160 auto submit(Callback&& c, Args&&... args);
171 template<
typename Callback,
typename... Args>
180 template<
typename TYPE>
190 template<
typename TYPE>
227template <typename TYPE>
228template <typename Callback, typename... Args>
230 : p([c = std::forward<Callback>(c), ...args = std::forward<Args>(args)]() mutable {
234template <
typename TYPE>
240template <
typename TYPE>
243 return this->p.getFuture();
246template <
typename COUPLE>
247bool original::taskDelegator::taskComparator<COUPLE>::operator()(
const COUPLE& lhs,
const COUPLE& rhs)
const
253 : threads_(thread_cnt),
257 for (
auto& thread_ : this->threads_) {
264 this->idle_threads_ += 1;
265 this->condition_.wait(this->mutex_, [
this] {
266 return this->stopped_ || !this->tasks_waiting_.empty() || !this->task_immediate_.empty();
269 if (this->stopped_ &&
270 this->tasks_waiting_.empty() &&
271 this->tasks_deferred_.empty() &&
272 this->task_immediate_.empty()) {
273 this->idle_threads_ -= 1;
277 if (!this->task_immediate_.empty()) {
278 task = std::move(this->task_immediate_.pop());
280 task = std::move(this->tasks_waiting_.pop().first());
282 this->idle_threads_ -= 1;
287 this->active_threads_ += 1;
292 this->active_threads_ -= 1;
300template <
typename Callback,
typename ... Args>
303 return this->submit(priority::NORMAL, std::forward<Callback>(c), std::forward<Args>(args)...);
306template <
typename Callback,
typename ... Args>
309 using ReturnType =
decltype(c(args...));
311 std::forward<Callback>(c),
312 std::forward<Args>(args)...
314 return this->submit<ReturnType>(
priority, new_task);
317template <
typename TYPE>
320 return this->submit(priority::NORMAL, t);
323template <
typename TYPE>
327 auto f = t->getFuture();
329 uniqueLock lock(this->mutex_);
330 if (this->stopped_) {
331 throw sysError(
"taskDelegator already stopped");
334 case priority::IMMEDIATE:
335 if (this->idle_threads_ == 0) {
336 throw sysError(
"No idle threads now");
338 this->task_immediate_.push(std::move(t.template dynamicCastTo<taskBase>()));
341 case priority::NORMAL:
343 this->tasks_waiting_.push(priorityTask{t.template dynamicCastTo<taskBase>(), priority});
345 case priority::DEFERRED:
346 this->tasks_deferred_.push(t.template dynamicCastTo<taskBase>());
349 throw sysError(
"Unknown priority");
352 this->condition_.notify();
360 if (!this->tasks_deferred_.empty()) {
361 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
366 this->condition_.notify();
373 if (this->tasks_deferred_.empty()) {
376 while (!this->tasks_deferred_.empty()) {
377 this->tasks_waiting_.push(
priorityTask{this->tasks_deferred_.pop(), priority::DEFERRED});
380 this->condition_.notifyAll();
387 this->stopped_ =
true;
389 this->condition_.notifyAll();
391 for (
auto& thread_ : this->threads_) {
392 if (thread_.joinable()) {
401 return this->active_threads_;
407 return this->idle_threads_;
415 stopped = this->stopped_;
Provides the array class for a fixed-size container with random access.
Represents a future result of an asynchronous computation.
Definition async.h:100
Represents a promise of a future result.
Definition async.h:152
Container for two heterogeneous elements.
Definition couple.h:37
Shared ownership smart pointer with strong references.
Definition refCntPtr.h:108
Abstract base class for all task types.
Definition tasks.h:21
virtual ~taskBase()=default
Virtual destructor for proper polymorphism.
virtual void run()=0
Executes the task.
Thread pool for managing and executing tasks with priority.
Definition tasks.h:90
async::future< TYPE > submit(strongPtr< task< TYPE > > &t)
Submits a pre-created task with normal priority.
void stop()
Stops the task delegator and waits for completion.
Definition tasks.h:383
void runAllDeferred()
Moves all deferred tasks to the waiting queue.
Definition tasks.h:369
priority
Task priority levels for execution scheduling.
Definition tasks.h:96
@ NORMAL
Normal priority task (default)
@ HIGH
High priority task.
@ IMMEDIATE
Execute immediately if threads available.
@ DEFERRED
Deferred execution (manual activation)
u_integer idleThreads() const noexcept
Gets the number of idle threads.
Definition tasks.h:404
u_integer activeThreads() const noexcept
Gets the number of active threads.
Definition tasks.h:398
auto submit(Callback &&c, Args &&... args)
Submits a task with normal priority.
Definition tasks.h:301
async::future< TYPE > submit(priority priority, strongPtr< task< TYPE > > &t)
Submits a pre-created task with specified priority.
taskDelegator(u_integer thread_cnt=8)
Constructs a task delegator with specified thread count.
Definition tasks.h:252
~taskDelegator()
Destructor - automatically stops if not already stopped.
Definition tasks.h:410
void runDeferred()
Moves one deferred task to the waiting queue.
Definition tasks.h:356
Concrete task implementation with result type.
Definition tasks.h:43
void run() override
Executes the task computation.
Definition tasks.h:235
async::future< TYPE > getFuture()
Gets the future associated with this task.
Definition tasks.h:241
task()=default
Default constructor.
High-level thread wrapper.
Definition thread.h:263
RAII wrapper for single mutex locking.
Definition mutex.h:216
std::uint32_t u_integer
32-bit unsigned integer type for sizes and indexes
Definition config.h:263
Main namespace for the project Original.
Definition algorithms.h:21
strongPtr< T, DEL > makeStrongPtr(Args &&... args)
Creates a new strongPtr managing a shared object.
Definition refCntPtr.h:633
Priority queue container implementation.
Queue container adapter implementation.
Reference-counted smart pointer hierarchy.
Dynamic array container with automatic resizing.