Motr  M0
thread_pool.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "ut/ut.h"
24 #include "lib/errno.h"
25 #include "lib/tlist.h"
26 #include "lib/atomic.h"
27 #include "lib/thread_pool.h"
28 
29 static const int ROUNDS_NR = 200;
30 static const int THREAD_NR = 10;
31 static const int QLINKS_NR = 10;
32 
33 static uint64_t test_counts = 0;
34 static struct m0_atomic64 counts;
35 
36 struct tpool_test {
37  uint64_t t_count; /* payload */
38  int t_rc;
40  uint64_t t_magic;
41 };
42 M0_TL_DESCR_DEFINE(tpt, "tpt-s", static, struct tpool_test, t_linkage,
43  t_magic, 0x1111111111111111, 0x1111111111111111);
44 M0_TL_DEFINE(tpt, static, struct tpool_test);
45 struct tpool_test test[] = {
46  { .t_count = 1 }, { .t_count = 2 }, { .t_count = 3 }, { .t_count = 4 },
47  { .t_count = 5 }, { .t_count = 6 }, { .t_count = 7 }, { .t_count = 8 },
48 };
49 struct tpool_test error_test[] = {
50  { .t_count = 1 }, { .t_count = 2, .t_rc = -EINTR, },
51  { .t_count = 3 }, { .t_count = 4, .t_rc = -ENOMEM, },
52  { .t_count = 5 }, { .t_count = 6, .t_rc = -E2BIG, },
53  { .t_count = 7 }, { .t_count = 8, .t_rc = -EINTR, },
54 };
55 
56 static int thread_pool_process(void *item)
57 {
58  struct tpool_test *t = (struct tpool_test *)item;
59 
60  if (t->t_rc == 0)
61  m0_atomic64_add(&counts, t->t_count);
62 
63  return t->t_rc;
64 }
65 
67 {
68  int rc;
69 
71  M0_UT_ASSERT(rc == 0);
72 }
73 
75 {
76  int rc;
77 
79  M0_UT_ASSERT(rc == 0);
80 }
81 
83 {
86 }
87 
88 static void feed(struct m0_parallel_pool *pool, bool err_test)
89 {
90  struct tpool_test *t = !err_test ? test : error_test;
91  int sz = !err_test ? ARRAY_SIZE(test)
93  int rc;
94  int i;
95 
96  for (i = 0; i < sz; ++i) {
98  M0_UT_ASSERT(rc == 0);
99  test_counts += !err_test ? t[i].t_count : (t[i].t_rc == 0 ?
100  t[i].t_count : 0);
101  }
102 }
103 
105 {
106  int rounds = ROUNDS_NR;
107  int rc;
108 
109  for (; rounds > 0; --rounds) {
110  feed(pool, false);
113  M0_UT_ASSERT(rc == 0);
115  }
116 
117 }
118 
120 {
121  int rounds = ROUNDS_NR;
122 
123  for (; rounds > 0; --rounds) {
124  int i;
125  int rc;
126  struct m0_tl items;
127  struct tpool_test *item;
128 
129  tpt_tlist_init(&items);
130 
131  for (i = 0; i < ARRAY_SIZE(test); ++i) {
132  tpt_tlink_init_at(&test[i], &items);
133  test_counts += test[i].t_count;
134  }
135 
136 
137  rc = M0_PARALLEL_FOR(tpt, pool, &items, thread_pool_process);
138  M0_UT_ASSERT(rc == 0);
140 
141  m0_tl_teardown(tpt, &items, item);
142  tpt_tlist_fini(&items);
143  }
144 }
145 
147 {
148  struct tpool_test *job = NULL;
149  int rounds = ROUNDS_NR;
150  int rc;
151 
152  for (; rounds > 0; --rounds) {
153  feed(pool, true);
156  M0_UT_ASSERT(rc == -EINTR);
158  while (m0_parallel_pool_rc_next(pool, (void **)&job, &rc) > 0) {
159  M0_UT_ASSERT(job != NULL);
160  M0_UT_ASSERT(rc == job->t_rc);
161  }
162  }
163 
164 }
165 
167 {
168  struct m0_parallel_pool pool = {};
169  struct m0_parallel_pool small_pool = {};
170 
171  m0_atomic64_set(&counts, 0);
172  test_counts = 0;
173 
179 
180  small_thread_pool_init(&small_pool);
181  parallel_for_pool_test(&small_pool);
182  thread_pool_fini(&small_pool);
183 }
184 
185 /*
186  * Local variables:
187  * c-indentation-style: "K&R"
188  * c-basic-offset: 8
189  * tab-width: 8
190  * fill-column: 80
191  * scroll-step: 1
192  * End:
193  */
M0_INTERNAL int m0_parallel_pool_job_add(struct m0_parallel_pool *pool, void *item)
Definition: thread_pool.c:264
M0_INTERNAL int m0_parallel_pool_rc_next(struct m0_parallel_pool *pool, void **job, int *rc)
Definition: thread_pool.c:281
static void thread_pool_init(struct m0_parallel_pool *pool)
Definition: thread_pool.c:66
#define NULL
Definition: misc.h:38
static int thread_pool_process(void *item)
Definition: thread_pool.c:56
static void thread_pool_fini(struct m0_parallel_pool *pool)
Definition: thread_pool.c:82
M0_INTERNAL void m0_parallel_pool_start(struct m0_parallel_pool *pool, int(*process)(void *item))
Definition: thread_pool.c:210
static const int QLINKS_NR
Definition: thread_pool.c:31
uint64_t t_magic
Definition: tlist.h:296
#define M0_PARALLEL_FOR(name, pool, list, process)
Definition: thread_pool.h:116
M0_INTERNAL int m0_parallel_pool_wait(struct m0_parallel_pool *pool)
Definition: thread_pool.c:229
static struct m0_rpc_item * item
Definition: item.c:56
struct tpool_test test[]
Definition: thread_pool.c:45
static void parallel_for_pool_test(struct m0_parallel_pool *pool)
Definition: thread_pool.c:119
uint64_t t_count
Definition: thread_pool.c:37
M0_INTERNAL void m0_parallel_pool_fini(struct m0_parallel_pool *pool)
Definition: thread_pool.c:204
int i
Definition: dir.c:1033
static const int THREAD_NR
Definition: thread_pool.c:30
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
static struct m0_thread t[8]
Definition: service_ut.c:1230
Definition: tlist.h:251
M0_INTERNAL int m0_parallel_pool_init(struct m0_parallel_pool *pool, int thread_nr, int qlinks_nr)
Definition: thread_pool.c:189
M0_TL_DESCR_DEFINE(tpt, "tpt-s", static, struct tpool_test, t_linkage, t_magic, 0x1111111111111111, 0x1111111111111111)
struct tpool_test error_test[]
Definition: thread_pool.c:49
M0_INTERNAL void m0_parallel_pool_terminate_wait(struct m0_parallel_pool *pool)
Definition: thread_pool.c:247
static struct m0_pool pool
Definition: iter_ut.c:58
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
static void simple_thread_pool_test(struct m0_parallel_pool *pool)
Definition: thread_pool.c:104
static const int ROUNDS_NR
Definition: thread_pool.c:29
M0_TL_DEFINE(tpt, static, struct tpool_test)
Definition: list.c:42
static void feed(struct m0_parallel_pool *pool, bool err_test)
Definition: thread_pool.c:88
static struct m0_atomic64 counts
Definition: thread_pool.c:34
static uint64_t test_counts
Definition: thread_pool.c:33
void m0_ut_lib_thread_pool_test(void)
Definition: thread_pool.c:166
struct m0_tlink t_linkage
Definition: thread_pool.c:39
static void small_thread_pool_init(struct m0_parallel_pool *pool)
Definition: thread_pool.c:74
uint64_t t_magic
Definition: thread_pool.c:40
static void m0_atomic64_add(struct m0_atomic64 *a, int64_t num)
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define M0_UT_ASSERT(a)
Definition: ut.h:46
static void simple_thread_pool_error_test(struct m0_parallel_pool *pool)
Definition: thread_pool.c:146
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)