Line data Source code
1 : /*
2 : * Copyright (C) 2025 aeml
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "jobs/JobSystem.hpp"
19 :
20 : #include <atomic>
21 : #include <condition_variable>
22 : #include <exception>
23 : #include <mutex>
24 : #include <queue>
25 : #include <thread>
26 : #include <unordered_map>
27 :
28 : namespace jobs
29 : {
30 : namespace
31 : {
32 : class FunctionJob final : public IJob
33 : {
34 : public:
35 160162 : explicit FunctionJob(std::function<void()> fn)
36 160162 : : m_fn(std::move(fn))
37 : {
38 160162 : }
39 :
40 160162 : void Execute() override
41 : {
42 160162 : if (m_fn)
43 : {
44 160162 : m_fn();
45 : }
46 160160 : }
47 :
48 : private:
49 : std::function<void()> m_fn;
50 : };
51 : }
52 :
53 : struct JobSystem::Impl
54 : {
55 : std::vector<std::thread> workers;
56 : std::queue<std::unique_ptr<IJob>> jobs;
57 : std::mutex mutex;
58 : std::condition_variable cv;
59 : std::atomic<bool> running{true};
60 : std::atomic<std::size_t> nextId{1};
61 :
62 : struct JobState
63 : {
64 : std::atomic<bool> completed{false};
65 : std::exception_ptr failure;
66 : std::mutex m;
67 : std::condition_variable cv;
68 : };
69 :
70 : std::unordered_map<std::size_t, std::shared_ptr<JobState>> states;
71 : std::unordered_map<std::size_t, std::exception_ptr> completedFailures;
72 : };
73 :
74 46 : JobSystem::JobSystem()
75 46 : : m_impl(std::make_unique<Impl>())
76 : {
77 46 : const auto workerCount = std::max(1u, std::thread::hardware_concurrency());
78 46 : m_impl->workers.reserve(workerCount);
79 :
80 230 : for (unsigned i = 0; i < workerCount; ++i)
81 : {
82 368 : m_impl->workers.emplace_back([impl = m_impl.get()]()
83 : {
84 : for (;;)
85 : {
86 160346 : std::unique_ptr<IJob> job;
87 :
88 : {
89 160346 : std::unique_lock<std::mutex> lock{impl->mutex};
90 160346 : impl->cv.wait(lock, [&]
91 : {
92 305538 : return !impl->running.load() || !impl->jobs.empty();
93 : });
94 :
95 160346 : if (!impl->running.load() && impl->jobs.empty())
96 : {
97 368 : return;
98 : }
99 :
100 160162 : job = std::move(impl->jobs.front());
101 160162 : impl->jobs.pop();
102 160346 : }
103 :
104 160162 : if (job)
105 : {
106 160162 : job->Execute();
107 : }
108 320508 : }
109 : });
110 : }
111 46 : }
112 :
113 92 : JobSystem::~JobSystem()
114 : {
115 46 : if (!m_impl)
116 : {
117 0 : return;
118 : }
119 :
120 : {
121 46 : std::lock_guard<std::mutex> lock{m_impl->mutex};
122 46 : m_impl->running.store(false);
123 46 : }
124 46 : m_impl->cv.notify_all();
125 :
126 230 : for (auto& worker : m_impl->workers)
127 : {
128 184 : if (worker.joinable())
129 : {
130 184 : worker.join();
131 : }
132 : }
133 46 : }
134 :
135 160162 : JobHandle JobSystem::Schedule(std::unique_ptr<IJob> job)
136 : {
137 160162 : if (!job)
138 : {
139 0 : return JobHandle{};
140 : }
141 :
142 : struct WrappedJob : public IJob
143 : {
144 : std::unique_ptr<IJob> inner;
145 : std::shared_ptr<Impl::JobState> state;
146 : Impl* impl;
147 : std::size_t id;
148 160162 : void Execute() override
149 : {
150 160162 : std::exception_ptr failure;
151 : try
152 : {
153 160162 : if (inner)
154 : {
155 160162 : inner->Execute();
156 : }
157 : }
158 2 : catch (...)
159 : {
160 2 : failure = std::current_exception();
161 2 : }
162 :
163 160162 : state->failure = failure;
164 160162 : state->completed.store(true, std::memory_order_release);
165 : {
166 160162 : std::lock_guard<std::mutex> l{state->m};
167 160162 : state->cv.notify_all();
168 160162 : }
169 : // Cleanup state map entry to avoid growth
170 160162 : if (impl)
171 : {
172 160162 : std::lock_guard<std::mutex> lock{impl->mutex};
173 160162 : auto stateIt = impl->states.find(id);
174 160162 : if (stateIt != impl->states.end())
175 : {
176 152952 : if (failure)
177 : {
178 2 : impl->completedFailures[id] = failure;
179 : }
180 152952 : impl->states.erase(stateIt);
181 : }
182 160162 : }
183 160162 : }
184 : };
185 :
186 160162 : JobHandle handle;
187 160162 : handle.id = m_impl->nextId.fetch_add(1);
188 :
189 160162 : auto state = std::make_shared<Impl::JobState>();
190 : {
191 160162 : std::lock_guard<std::mutex> lock{m_impl->mutex};
192 160162 : m_impl->states.emplace(handle.id, state);
193 160162 : auto wrapped = std::make_unique<WrappedJob>();
194 160162 : wrapped->inner = std::move(job);
195 160162 : wrapped->state = state;
196 160162 : wrapped->impl = m_impl.get();
197 160162 : wrapped->id = handle.id;
198 160162 : m_impl->jobs.push(std::move(wrapped));
199 160162 : }
200 160162 : m_impl->cv.notify_one();
201 :
202 160162 : return handle;
203 160162 : }
204 :
205 160162 : JobHandle JobSystem::ScheduleFunction(const std::function<void()>& fn)
206 : {
207 160162 : return Schedule(std::make_unique<FunctionJob>(fn));
208 : }
209 :
210 160162 : void JobSystem::Wait(const JobHandle& handle)
211 : {
212 160162 : if (!m_impl || handle.id == 0)
213 : {
214 105623 : return;
215 : }
216 :
217 160162 : std::shared_ptr<Impl::JobState> state;
218 : {
219 160162 : std::lock_guard<std::mutex> lock{m_impl->mutex};
220 160162 : auto it = m_impl->states.find(handle.id);
221 160162 : if (it == m_impl->states.end())
222 : {
223 105623 : auto failed = m_impl->completedFailures.find(handle.id);
224 105623 : if (failed != m_impl->completedFailures.end())
225 : {
226 0 : auto ex = failed->second;
227 0 : m_impl->completedFailures.erase(failed);
228 0 : std::rethrow_exception(ex);
229 0 : }
230 :
231 : // Already completed and cleaned up.
232 105623 : return;
233 : }
234 54539 : state = it->second;
235 160162 : }
236 :
237 54539 : if (!state)
238 : {
239 0 : return;
240 : }
241 :
242 54539 : auto finishWait = [&]()
243 : {
244 54539 : std::lock_guard<std::mutex> lock{m_impl->mutex};
245 54539 : m_impl->states.erase(handle.id);
246 54539 : m_impl->completedFailures.erase(handle.id);
247 54539 : };
248 :
249 54539 : if (!state->completed.load(std::memory_order_acquire))
250 : {
251 46695 : std::unique_lock<std::mutex> lk{state->m};
252 139634 : state->cv.wait(lk, [&]{ return state->completed.load(std::memory_order_acquire); });
253 46695 : }
254 :
255 54539 : auto failure = state->failure;
256 54539 : finishWait();
257 :
258 54539 : if (failure)
259 : {
260 4 : std::rethrow_exception(failure);
261 : }
262 160164 : }
263 :
264 47317 : std::vector<JobHandle> JobSystem::Dispatch(std::size_t jobCount, std::size_t batchSize, const std::function<void(std::size_t, std::size_t)>& job)
265 : {
266 47317 : std::vector<JobHandle> handles;
267 47317 : if (jobCount == 0 || batchSize == 0)
268 : {
269 0 : return handles;
270 : }
271 :
272 207409 : for (std::size_t i = 0; i < jobCount; i += batchSize)
273 : {
274 160092 : std::size_t end = std::min(i + batchSize, jobCount);
275 160092 : handles.push_back(ScheduleFunction([=]() {
276 160092 : job(i, end);
277 160092 : }));
278 : }
279 47317 : return handles;
280 0 : }
281 :
282 47317 : void JobSystem::Wait(const std::vector<JobHandle>& handles)
283 : {
284 207409 : for (const auto& handle : handles)
285 : {
286 160092 : Wait(handle);
287 : }
288 47317 : }
289 :
290 47318 : std::size_t JobSystem::WorkerCount() const noexcept
291 : {
292 47318 : return m_impl ? m_impl->workers.size() : 0;
293 : }
294 : }
|