30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_LIB 130 for (
i = 0;
i <
pool->pp_thread_nr; ++
i) {
150 int thread_nr,
int qlink_nr)
153 int result = thread_nr > 0 && qlink_nr > 0 ? 0 : -EINVAL;
171 for (
i = 0;
i < qlink_nr; ++
i)
174 for (
i = 0;
i < thread_nr; ++
i) {
178 "pool_thread%d",
pool->pp_thread_nr);
183 pool->pp_thread_nr++;
190 int thread_nr,
int qlinks_nr)
193 pool->pp_done =
false;
195 pool->pp_thread_nr = 0;
196 pool->pp_qlinks_nr = qlinks_nr;
211 int (*process)(
void *
item))
219 pool->pp_process = process;
220 pool->pp_next_rc = 0;
221 for (
i = 0;
i <
pool->pp_qlinks_nr; ++
i)
222 pool->pp_qlinks[
i].ql_rc = 0;
225 for (
i = 0;
i <
pool->pp_thread_nr; ++
i)
236 for (
i = 0;
i <
pool->pp_thread_nr; ++
i)
244 pool->pp_qlinks[
i].ql_rc == 0) ? 0 :
M0_ERR(-EINTR);
254 pool->pp_done =
true;
255 for (
i = 0;
i <
pool->pp_thread_nr; ++
i)
258 for (
i = 0;
i <
pool->pp_thread_nr; ++
i)
272 if (pos < pool->pp_qlinks_nr) {
273 pool->pp_qlinks[pos].ql_item =
item;
288 for (
i =
pool->pp_next_rc; i < pool->pp_qlinks_nr; ++
i) {
289 if (
pool->pp_qlinks[
i].ql_rc != 0) {
290 *job =
pool->pp_qlinks[
i].ql_item;
291 *
rc =
pool->pp_qlinks[
i].ql_rc;
292 pool->pp_next_rc = ++
i;
301 #undef M0_TRACE_SUBSYSTEM M0_INTERNAL int m0_parallel_pool_job_add(struct m0_parallel_pool *pool, void *item)
static void parallel_pool_fini(struct m0_parallel_pool *pool, bool join)
#define M0_ALLOC_ARR(arr, nr)
M0_INTERNAL int m0_parallel_pool_rc_next(struct m0_parallel_pool *pool, void **job, int *rc)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
static void pool_thread(struct m0_parallel_pool *pool)
static void pool_threads_fini(struct m0_parallel_pool *pool, bool join)
int m0_thread_join(struct m0_thread *q)
M0_INTERNAL void m0_parallel_pool_start(struct m0_parallel_pool *pool, int(*process)(void *item))
#define M0_LOG(level,...)
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
M0_INTERNAL int m0_parallel_pool_wait(struct m0_parallel_pool *pool)
static struct m0_rpc_item * item
static void parallel_queue_add(struct m0_parallel_queue *queue, struct m0_parallel_queue_link *link)
M0_INTERNAL void m0_parallel_pool_fini(struct m0_parallel_pool *pool)
return M0_ERR(-EOPNOTSUPP)
static int pool_threads_init(struct m0_parallel_pool *pool, int thread_nr, int qlink_nr)
static void parallel_queue_link_init(struct m0_parallel_queue_link *l)
static void parallel_queue_fini(struct m0_parallel_queue *queue)
static struct m0_parallel_queue_link * parallel_queue_get(struct m0_parallel_queue *queue)
void m0_thread_fini(struct m0_thread *q)
M0_INTERNAL void m0_queue_link_init(struct m0_queue_link *ql)
M0_INTERNAL int m0_parallel_pool_init(struct m0_parallel_pool *pool, int thread_nr, int qlinks_nr)
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
static struct m0_clink l[NR]
M0_INTERNAL void m0_parallel_pool_terminate_wait(struct m0_parallel_pool *pool)
static struct m0_pool pool
#define m0_forall(var, nr,...)
M0_INTERNAL struct m0_queue_link * m0_queue_get(struct m0_queue *q)
M0_INTERNAL void m0_queue_put(struct m0_queue *q, struct m0_queue_link *ql)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
M0_INTERNAL size_t m0_queue_length(const struct m0_queue *q)
static void parallel_queue_init(struct m0_parallel_queue *queue)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_queue_init(struct m0_queue *q)
struct m0_queue_link ql_link
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
M0_INTERNAL void m0_queue_fini(struct m0_queue *q)