LCOV - code coverage report
Current view: top level - src/jobs - JobSystem.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 92.9 % 127 118
Test Date: 2026-04-10 19:03:25 Functions: 100.0 % 16 16

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2025 aeml
       3              :  *
       4              :  * This program is free software: you can redistribute it and/or modify
       5              :  * it under the terms of the GNU General Public License as published by
       6              :  * the Free Software Foundation, either version 3 of the License, or
       7              :  * (at your option) any later version.
       8              :  *
       9              :  * This program is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12              :  * GNU General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU General Public License
      15              :  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
      16              :  */
      17              : 
      18              : #include "jobs/JobSystem.hpp"
      19              : 
      20              : #include <atomic>
      21              : #include <condition_variable>
      22              : #include <exception>
      23              : #include <mutex>
      24              : #include <queue>
      25              : #include <thread>
      26              : #include <unordered_map>
      27              : 
      28              : namespace jobs
      29              : {
      30              :     namespace
      31              :     {
      32              :         class FunctionJob final : public IJob
      33              :         {
      34              :         public:
      35       160162 :             explicit FunctionJob(std::function<void()> fn)
      36       160162 :                 : m_fn(std::move(fn))
      37              :             {
      38       160162 :             }
      39              : 
      40       160162 :             void Execute() override
      41              :             {
      42       160162 :                 if (m_fn)
      43              :                 {
      44       160162 :                     m_fn();
      45              :                 }
      46       160160 :             }
      47              : 
      48              :         private:
      49              :             std::function<void()> m_fn;
      50              :         };
      51              :     }
      52              : 
      53              :     struct JobSystem::Impl
      54              :     {
      55              :         std::vector<std::thread>          workers;
      56              :         std::queue<std::unique_ptr<IJob>> jobs;
      57              :         std::mutex                        mutex;
      58              :         std::condition_variable           cv;
      59              :         std::atomic<bool>                 running{true};
      60              :         std::atomic<std::size_t>          nextId{1};
      61              : 
      62              :         struct JobState
      63              :         {
      64              :             std::atomic<bool> completed{false};
      65              :             std::exception_ptr failure;
      66              :             std::mutex m;
      67              :             std::condition_variable cv;
      68              :         };
      69              : 
      70              :         std::unordered_map<std::size_t, std::shared_ptr<JobState>> states;
      71              :         std::unordered_map<std::size_t, std::exception_ptr> completedFailures;
      72              :     };
      73              : 
      74           46 :     JobSystem::JobSystem()
      75           46 :         : m_impl(std::make_unique<Impl>())
      76              :     {
      77           46 :         const auto workerCount = std::max(1u, std::thread::hardware_concurrency());
      78           46 :         m_impl->workers.reserve(workerCount);
      79              : 
      80          230 :         for (unsigned i = 0; i < workerCount; ++i)
      81              :         {
      82          368 :             m_impl->workers.emplace_back([impl = m_impl.get()]()
      83              :             {
      84              :                 for (;;)
      85              :                 {
      86       160346 :                     std::unique_ptr<IJob> job;
      87              : 
      88              :                     {
      89       160346 :                         std::unique_lock<std::mutex> lock{impl->mutex};
      90       160346 :                         impl->cv.wait(lock, [&]
      91              :                         {
      92       305538 :                             return !impl->running.load() || !impl->jobs.empty();
      93              :                         });
      94              : 
      95       160346 :                         if (!impl->running.load() && impl->jobs.empty())
      96              :                         {
      97          368 :                             return;
      98              :                         }
      99              : 
     100       160162 :                         job = std::move(impl->jobs.front());
     101       160162 :                         impl->jobs.pop();
     102       160346 :                     }
     103              : 
     104       160162 :                     if (job)
     105              :                     {
     106       160162 :                         job->Execute();
     107              :                     }
     108       320508 :                 }
     109              :             });
     110              :         }
     111           46 :     }
     112              : 
     113           92 :     JobSystem::~JobSystem()
     114              :     {
     115           46 :         if (!m_impl)
     116              :         {
     117            0 :             return;
     118              :         }
     119              : 
     120              :         {
     121           46 :             std::lock_guard<std::mutex> lock{m_impl->mutex};
     122           46 :             m_impl->running.store(false);
     123           46 :         }
     124           46 :         m_impl->cv.notify_all();
     125              : 
     126          230 :         for (auto& worker : m_impl->workers)
     127              :         {
     128          184 :             if (worker.joinable())
     129              :             {
     130          184 :                 worker.join();
     131              :             }
     132              :         }
     133           46 :     }
     134              : 
     135       160162 :     JobHandle JobSystem::Schedule(std::unique_ptr<IJob> job)
     136              :     {
     137       160162 :         if (!job)
     138              :         {
     139            0 :             return JobHandle{};
     140              :         }
     141              : 
     142              :         struct WrappedJob : public IJob
     143              :         {
     144              :             std::unique_ptr<IJob> inner;
     145              :             std::shared_ptr<Impl::JobState> state;
     146              :             Impl* impl;
     147              :             std::size_t id;
     148       160162 :             void Execute() override
     149              :             {
     150       160162 :                 std::exception_ptr failure;
     151              :                 try
     152              :                 {
     153       160162 :                     if (inner)
     154              :                     {
     155       160162 :                         inner->Execute();
     156              :                     }
     157              :                 }
     158            2 :                 catch (...)
     159              :                 {
     160            2 :                     failure = std::current_exception();
     161            2 :                 }
     162              : 
     163       160162 :                 state->failure = failure;
     164       160162 :                 state->completed.store(true, std::memory_order_release);
     165              :                 {
     166       160162 :                     std::lock_guard<std::mutex> l{state->m};
     167       160162 :                     state->cv.notify_all();
     168       160162 :                 }
     169              :                 // Cleanup state map entry to avoid growth
     170       160162 :                 if (impl)
     171              :                 {
     172       160162 :                     std::lock_guard<std::mutex> lock{impl->mutex};
     173       160162 :                     auto stateIt = impl->states.find(id);
     174       160162 :                     if (stateIt != impl->states.end())
     175              :                     {
     176       152952 :                         if (failure)
     177              :                         {
     178            2 :                             impl->completedFailures[id] = failure;
     179              :                         }
     180       152952 :                         impl->states.erase(stateIt);
     181              :                     }
     182       160162 :                 }
     183       160162 :             }
     184              :         };
     185              : 
     186       160162 :         JobHandle handle;
     187       160162 :         handle.id = m_impl->nextId.fetch_add(1);
     188              : 
     189       160162 :         auto state = std::make_shared<Impl::JobState>();
     190              :         {
     191       160162 :             std::lock_guard<std::mutex> lock{m_impl->mutex};
     192       160162 :             m_impl->states.emplace(handle.id, state);
     193       160162 :             auto wrapped = std::make_unique<WrappedJob>();
     194       160162 :             wrapped->inner = std::move(job);
     195       160162 :             wrapped->state = state;
     196       160162 :             wrapped->impl = m_impl.get();
     197       160162 :             wrapped->id = handle.id;
     198       160162 :             m_impl->jobs.push(std::move(wrapped));
     199       160162 :         }
     200       160162 :         m_impl->cv.notify_one();
     201              : 
     202       160162 :         return handle;
     203       160162 :     }
     204              : 
     205       160162 :     JobHandle JobSystem::ScheduleFunction(const std::function<void()>& fn)
     206              :     {
     207       160162 :         return Schedule(std::make_unique<FunctionJob>(fn));
     208              :     }
     209              : 
     210       160162 :     void JobSystem::Wait(const JobHandle& handle)
     211              :     {
     212       160162 :         if (!m_impl || handle.id == 0)
     213              :         {
     214       105623 :             return;
     215              :         }
     216              : 
     217       160162 :         std::shared_ptr<Impl::JobState> state;
     218              :         {
     219       160162 :             std::lock_guard<std::mutex> lock{m_impl->mutex};
     220       160162 :             auto it = m_impl->states.find(handle.id);
     221       160162 :             if (it == m_impl->states.end())
     222              :             {
     223       105623 :                 auto failed = m_impl->completedFailures.find(handle.id);
     224       105623 :                 if (failed != m_impl->completedFailures.end())
     225              :                 {
     226            0 :                     auto ex = failed->second;
     227            0 :                     m_impl->completedFailures.erase(failed);
     228            0 :                     std::rethrow_exception(ex);
     229            0 :                 }
     230              : 
     231              :                 // Already completed and cleaned up.
     232       105623 :                 return;
     233              :             }
     234        54539 :             state = it->second;
     235       160162 :         }
     236              : 
     237        54539 :         if (!state)
     238              :         {
     239            0 :             return;
     240              :         }
     241              : 
     242        54539 :         auto finishWait = [&]()
     243              :         {
     244        54539 :             std::lock_guard<std::mutex> lock{m_impl->mutex};
     245        54539 :             m_impl->states.erase(handle.id);
     246        54539 :             m_impl->completedFailures.erase(handle.id);
     247        54539 :         };
     248              : 
     249        54539 :         if (!state->completed.load(std::memory_order_acquire))
     250              :         {
     251        46695 :             std::unique_lock<std::mutex> lk{state->m};
     252       139634 :             state->cv.wait(lk, [&]{ return state->completed.load(std::memory_order_acquire); });
     253        46695 :         }
     254              : 
     255        54539 :         auto failure = state->failure;
     256        54539 :         finishWait();
     257              : 
     258        54539 :         if (failure)
     259              :         {
     260            4 :             std::rethrow_exception(failure);
     261              :         }
     262       160164 :     }
     263              : 
     264        47317 :     std::vector<JobHandle> JobSystem::Dispatch(std::size_t jobCount, std::size_t batchSize, const std::function<void(std::size_t, std::size_t)>& job)
     265              :     {
     266        47317 :         std::vector<JobHandle> handles;
     267        47317 :         if (jobCount == 0 || batchSize == 0)
     268              :         {
     269            0 :             return handles;
     270              :         }
     271              : 
     272       207409 :         for (std::size_t i = 0; i < jobCount; i += batchSize)
     273              :         {
     274       160092 :             std::size_t end = std::min(i + batchSize, jobCount);
     275       160092 :             handles.push_back(ScheduleFunction([=]() {
     276       160092 :                 job(i, end);
     277       160092 :             }));
     278              :         }
     279        47317 :         return handles;
     280            0 :     }
     281              : 
     282        47317 :     void JobSystem::Wait(const std::vector<JobHandle>& handles)
     283              :     {
     284       207409 :         for (const auto& handle : handles)
     285              :         {
     286       160092 :             Wait(handle);
     287              :         }
     288        47317 :     }
     289              : 
     290        47318 :     std::size_t JobSystem::WorkerCount() const noexcept
     291              :     {
     292        47318 :         return m_impl ? m_impl->workers.size() : 0;
     293              :     }
     294              : }
        

Generated by: LCOV version 2.0-1