Motr  M0
thread_pool.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_LIB
31 #include "lib/trace.h"
32 #include "lib/thread_pool.h"
33 #include "lib/memory.h"
34 #include "lib/assert.h"
35 #include "lib/errno.h"
36 
38  PPS_IDLE = 1,
42 };
43 
44 /* queue */
45 
46 
49  int ql_rc;
50  void *ql_item;
51 };
52 
55  struct m0_mutex pq_lock;
56 };
57 
58 static struct m0_parallel_queue_link *
60 {
61  struct m0_parallel_queue_link *link;
62 
63  m0_mutex_lock(&queue->pq_lock);
64  link = (struct m0_parallel_queue_link *)m0_queue_get(&queue->pq_queue);
65  m0_mutex_unlock(&queue->pq_lock);
66 
67  return link;
68 }
69 
71  struct m0_parallel_queue_link *link)
72 {
73  m0_mutex_lock(&queue->pq_lock);
74  m0_queue_put(&queue->pq_queue, &link->ql_link);
75  m0_mutex_unlock(&queue->pq_lock);
76 }
77 
79 {
80  m0_queue_init(&queue->pq_queue);
81  m0_mutex_init(&queue->pq_lock);
82 }
83 
85 {
86  m0_mutex_fini(&queue->pq_lock);
87  m0_queue_fini(&queue->pq_queue);
88 }
89 
91 {
92  m0_queue_link_init(&l->ql_link);
93  l->ql_item = NULL;
94 }
95 
96 /* pool */
97 
98 
99 static void pool_thread(struct m0_parallel_pool *pool)
100 {
101  struct m0_parallel_queue_link *qlink;
102  void *item;
103 
104  while (true) {
105  m0_semaphore_down(&pool->pp_ready);
106 
107  M0_PRE(M0_IN(pool->pp_state, (PPS_BUSY, PPS_TERMINATING)));
108  if (pool->pp_done)
109  break;
110 
111  while (true) {
112  qlink = parallel_queue_get(pool->pp_queue);
113  if (qlink == NULL)
114  break;
115 
116  item = qlink->ql_item;
117  qlink->ql_rc = pool->pp_process(item);
118  M0_LOG(M0_DEBUG, "job: %p, ql_rc: %d", item,
119  qlink->ql_rc);
120  }
121 
122  m0_semaphore_up(&pool->pp_sync);
123  }
124 }
125 
126 static void pool_threads_fini(struct m0_parallel_pool *pool, bool join)
127 {
128  int i;
129 
130  for (i = 0; i < pool->pp_thread_nr; ++i) {
131  if (join)
132  m0_thread_join(&pool->pp_threads[i]);
133  m0_thread_fini(&pool->pp_threads[i]);
134  }
135 
136  parallel_queue_fini(pool->pp_queue);
137  m0_free(pool->pp_threads);
138  m0_free(pool->pp_qlinks);
139  m0_free(pool->pp_queue);
140 }
141 
142 static void parallel_pool_fini(struct m0_parallel_pool *pool, bool join)
143 {
144  pool_threads_fini(pool, join);
145  m0_semaphore_fini(&pool->pp_sync);
146  m0_semaphore_fini(&pool->pp_ready);
147 }
148 
150  int thread_nr, int qlink_nr)
151 {
152  int i;
153  int result = thread_nr > 0 && qlink_nr > 0 ? 0 : -EINVAL;
154 
155  if (result != 0)
156  return result;
157 
158  M0_ALLOC_ARR(pool->pp_threads, thread_nr);
159  M0_ALLOC_ARR(pool->pp_qlinks, qlink_nr);
160  M0_ALLOC_PTR(pool->pp_queue);
161  if (pool->pp_threads == NULL || pool->pp_qlinks == NULL ||
162  pool->pp_queue == NULL) {
163  m0_free(pool->pp_threads);
164  m0_free(pool->pp_qlinks);
165  m0_free(pool->pp_queue);
166  return -ENOMEM;
167  }
168 
169  parallel_queue_init(pool->pp_queue);
170 
171  for (i = 0; i < qlink_nr; ++i)
172  parallel_queue_link_init(&pool->pp_qlinks[i]);
173 
174  for (i = 0; i < thread_nr; ++i) {
175  result = M0_THREAD_INIT(&pool->pp_threads[pool->pp_thread_nr],
176  struct m0_parallel_pool*, NULL,
177  &pool_thread, pool,
178  "pool_thread%d", pool->pp_thread_nr);
179  if (result != 0) {
180  parallel_pool_fini(pool, true);
181  break;
182  }
183  pool->pp_thread_nr++;
184  }
185 
186  return result;
187 }
188 
190  int thread_nr, int qlinks_nr)
191 {
192  pool->pp_state = PPS_IDLE;
193  pool->pp_done = false;
194  pool->pp_process = NULL;
195  pool->pp_thread_nr = 0;
196  pool->pp_qlinks_nr = qlinks_nr;
197 
198  m0_semaphore_init(&pool->pp_ready, 0);
199  m0_semaphore_init(&pool->pp_sync, 0);
200 
201  return pool_threads_init(pool, thread_nr, qlinks_nr);
202 }
203 
204 M0_INTERNAL void m0_parallel_pool_fini(struct m0_parallel_pool *pool)
205 {
206  M0_PRE(pool->pp_state == PPS_TERMINATED);
207  parallel_pool_fini(pool, false);
208 }
209 
211  int (*process)(void *item))
212 {
213  int i;
214 
215  M0_PRE(pool->pp_state == PPS_IDLE);
216  M0_PRE(pool->pp_process == NULL);
217 
218  pool->pp_state = PPS_BUSY;
219  pool->pp_process = process;
220  pool->pp_next_rc = 0;
221  for (i = 0; i < pool->pp_qlinks_nr; ++i)
222  pool->pp_qlinks[i].ql_rc = 0;
223 
224 
225  for (i = 0; i < pool->pp_thread_nr; ++i)
226  m0_semaphore_up(&pool->pp_ready);
227 }
228 
230 {
231  int i;
232 
233  M0_PRE(pool->pp_state == PPS_BUSY);
234  M0_PRE(pool->pp_process != NULL);
235 
236  for (i = 0; i < pool->pp_thread_nr; ++i)
237  m0_semaphore_down(&pool->pp_sync);
238 
239  M0_ASSERT(pool->pp_process != NULL);
240  pool->pp_state = PPS_IDLE;
241  pool->pp_process = NULL;
242 
243  return m0_forall(i, pool->pp_qlinks_nr,
244  pool->pp_qlinks[i].ql_rc == 0) ? 0 : M0_ERR(-EINTR);
245 }
246 
248 {
249  int i;
250 
251  M0_PRE(pool->pp_state == PPS_IDLE);
252  pool->pp_state = PPS_TERMINATING;
253 
254  pool->pp_done = true;
255  for (i = 0; i < pool->pp_thread_nr; ++i)
256  m0_semaphore_up(&pool->pp_ready);
257 
258  for (i = 0; i < pool->pp_thread_nr; ++i)
259  m0_thread_join(&pool->pp_threads[i]);
260 
261  pool->pp_state = PPS_TERMINATED;
262 }
263 
265  void *item)
266 {
267  int pos;
268 
269  M0_PRE(pool->pp_state == PPS_IDLE);
270 
271  pos = m0_queue_length(&pool->pp_queue->pq_queue);
272  if (pos < pool->pp_qlinks_nr) {
273  pool->pp_qlinks[pos].ql_item = item;
274  parallel_queue_add(pool->pp_queue, &pool->pp_qlinks[pos]);
275  return 0;
276  }
277 
278  return M0_ERR(-EFBIG);
279 }
280 
282  void **job, int *rc)
283 {
284  int i;
285 
286  M0_PRE(pool->pp_state == PPS_IDLE);
287 
288  for (i = pool->pp_next_rc; i < pool->pp_qlinks_nr; ++i) {
289  if (pool->pp_qlinks[i].ql_rc != 0) {
290  *job = pool->pp_qlinks[i].ql_item;
291  *rc = pool->pp_qlinks[i].ql_rc;
292  pool->pp_next_rc = ++i;
293  return +1;
294  }
295  }
296 
297  return 0;
298 }
299 
300 
301 #undef M0_TRACE_SUBSYSTEM
302 
305 /*
306  * Local variables:
307  * c-indentation-style: "K&R"
308  * c-basic-offset: 8
309  * tab-width: 8
310  * fill-column: 80
311  * scroll-step: 1
312  * End:
313  */
314 /*
315  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
316  */
M0_INTERNAL int m0_parallel_pool_job_add(struct m0_parallel_pool *pool, void *item)
Definition: thread_pool.c:264
static void parallel_pool_fini(struct m0_parallel_pool *pool, bool join)
Definition: thread_pool.c:142
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_parallel_pool_rc_next(struct m0_parallel_pool *pool, void **job, int *rc)
Definition: thread_pool.c:281
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
#define NULL
Definition: misc.h:38
static void pool_thread(struct m0_parallel_pool *pool)
Definition: thread_pool.c:99
static void pool_threads_fini(struct m0_parallel_pool *pool, bool join)
Definition: thread_pool.c:126
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
M0_INTERNAL void m0_parallel_pool_start(struct m0_parallel_pool *pool, int(*process)(void *item))
Definition: thread_pool.c:210
#define M0_LOG(level,...)
Definition: trace.h:167
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
Definition: queue.h:43
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL int m0_parallel_pool_wait(struct m0_parallel_pool *pool)
Definition: thread_pool.c:229
static struct m0_rpc_item * item
Definition: item.c:56
static void parallel_queue_add(struct m0_parallel_queue *queue, struct m0_parallel_queue_link *link)
Definition: thread_pool.c:70
Definition: beck.c:80
M0_INTERNAL void m0_parallel_pool_fini(struct m0_parallel_pool *pool)
Definition: thread_pool.c:204
int i
Definition: dir.c:1033
return M0_ERR(-EOPNOTSUPP)
#define M0_ASSERT(cond)
m0_parallel_pool_state
Definition: thread_pool.c:37
static int pool_threads_init(struct m0_parallel_pool *pool, int thread_nr, int qlink_nr)
Definition: thread_pool.c:149
static void parallel_queue_link_init(struct m0_parallel_queue_link *l)
Definition: thread_pool.c:90
static void parallel_queue_fini(struct m0_parallel_queue *queue)
Definition: thread_pool.c:84
static struct m0_parallel_queue_link * parallel_queue_get(struct m0_parallel_queue *queue)
Definition: thread_pool.c:59
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
M0_INTERNAL void m0_queue_link_init(struct m0_queue_link *ql)
Definition: queue.c:71
M0_INTERNAL int m0_parallel_pool_init(struct m0_parallel_pool *pool, int thread_nr, int qlinks_nr)
Definition: thread_pool.c:189
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static struct m0_clink l[NR]
Definition: chan.c:37
M0_INTERNAL void m0_parallel_pool_terminate_wait(struct m0_parallel_pool *pool)
Definition: thread_pool.c:247
static struct m0_pool pool
Definition: iter_ut.c:58
#define m0_forall(var, nr,...)
Definition: misc.h:112
M0_INTERNAL struct m0_queue_link * m0_queue_get(struct m0_queue *q)
Definition: queue.c:112
M0_INTERNAL void m0_queue_put(struct m0_queue *q, struct m0_queue_link *ql)
Definition: queue.c:131
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
M0_INTERNAL size_t m0_queue_length(const struct m0_queue *q)
Definition: queue.c:100
static void parallel_queue_init(struct m0_parallel_queue *queue)
Definition: thread_pool.c:78
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_queue_init(struct m0_queue *q)
Definition: queue.c:53
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
struct m0_mutex pq_lock
Definition: thread_pool.c:55
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
M0_INTERNAL void m0_queue_fini(struct m0_queue *q)
Definition: queue.c:59
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
int32_t rc
Definition: trigger_fop.h:47
struct m0_queue pq_queue
Definition: thread_pool.c:54