Motr  M0
st_worker.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 /*
24  * Start and stop worker threads
25  */
26 #include "motr/st/st_misc.h"
27 #include "motr/st/st.h"
28 
29 #ifdef __KERNEL__
30 
31 # include <linux/types.h>
32 
33 enum { ST_MAX_THREAD_NAME_LEN = 64 };
34 
35 typedef struct task_struct st_thread_t;
36 static st_thread_t **workers = NULL;
37 
38 static int thread_init(st_thread_t **ret_th,
39  int (*func)(void *), void *args,
40  const char *name_fmt, ...)
41 {
42  int rc;
43  char name_str[ST_MAX_THREAD_NAME_LEN];
44  va_list vl;
45  st_thread_t *th;
46 
47  /* construct the name of this thread*/
48  va_start(vl, name_fmt);
49  rc = vsnprintf(name_str, sizeof(name_str), name_fmt, vl);
50  M0_ASSERT(rc > 0);
51  va_end(vl);
52 
53  /* create and start a thread */
54  th = kthread_create(func, args, "%s", name_str);
55  if (th == NULL) {
56  rc = PTR_ERR(th);
57  } else {
58  rc = 0;
59  wake_up_process(th);
60  *ret_th = th;
61  }
62 
63  return rc;
64 }
65 
66 static int thread_join(st_thread_t *th)
67 {
68  return kthread_stop(th);
69 }
70 
71 static void thread_fini(st_thread_t *th)
72 {
73  return;
74 }
75 
76 void thread_exit(void)
77 {
78  /*
79  * A thread doesn't exit until thread_join is called
80  * here, kthread_should_stop is called to check if this condition
81  * is met
82  */
83 
84  set_current_state(TASK_INTERRUPTIBLE);
85  while (!kthread_should_stop()) {
86  schedule();
87  set_current_state(TASK_INTERRUPTIBLE);
88  }
89  set_current_state(TASK_RUNNING);
90 }
91 
92 #else /* user space*/
93 
94 #include <stdint.h>
95 #include <errno.h>
96 #include <pthread.h>
97 
98 typedef pthread_t st_thread_t;
100 
101 static int thread_init(st_thread_t **ret_th,
102  void* (*func)(void *), void *args, ...)
103 {
104  int rc;
105  st_thread_t *th;
106 
107  th = (st_thread_t *) mem_alloc(sizeof(th));
108  if (th == NULL)
109  return -ENOMEM;
110 
111  rc = pthread_create(th, NULL, func, args);
112  if (rc == 0)
113  *ret_th = th;
114 
115  return -rc;
116 }
117 
118 void thread_exit(void)
119 {
120 
121 }
122 
123 static int thread_join(st_thread_t *th)
124 {
125  void *result;
126  int rc;
127 
128  if (pthread_join(*th, &result) == 0) {
129  rc = *((int *)result);
130  m0_free(result);
131  return rc;
132  } else
133  return -1;
134 }
135 
136 static void thread_fini(st_thread_t *th)
137 {
138  mem_free(th);
139  return;
140 }
141 
142 #endif
143 
144 #include "lib/thread.h"
145 
146 #ifdef __KERNEL__
147 static int st_worker(void *in)
148 #else
149 static void* st_worker(void *in)
150 #endif
151 {
152  int idx;
153  int nr_rounds;
154  uint64_t start;
155  uint64_t end;
156  pid_t tid;
157  struct st_cfg cfg;
158  int *nr_failed_assertions;
159 
160 #ifndef __KERNEL__
161  struct m0_thread *mthread;
162 
163  MEM_ALLOC_PTR(mthread);
164  if (mthread == NULL)
165  return 0;
166  memset(mthread, 0, sizeof(struct m0_thread));
167 
168  m0_thread_adopt(mthread, st_get_motr());
169 
170 #else
171  int k_sys_return;
173 #endif
174 
175  nr_failed_assertions = m0_alloc(sizeof *nr_failed_assertions);
176  if (nr_failed_assertions == NULL)
177 #ifndef __KERNEL__
178  return NULL;
179 #else
180  return -ENOMEM;
181 #endif
182 
183  /* set tid of this worker thread*/
184  idx = *((int *)in);
185  tid = get_tid();
186  st_set_worker_tid(idx, tid);
187 
188  cfg = st_get_cfg();
189  nr_rounds = cfg.sc_nr_rounds;
190 
191  start = time_now();
192  end = start + cfg.sc_deadline;
193 
194  /* main loop */
195  while(1) {
196  /* TODO: synchronize all threads to start the tests
197  * at the same time
198  */
199 
200  *nr_failed_assertions = st_run(st_get_tests());
201 
202  /*
203  * tests don't continue if the thread runs out of
204  * time or reaches the number of rounds
205  */
206  if (nr_rounds-- == 0 || time_now() > end)
207  break;
208  }
209 
210 #ifndef __KERNEL__
211  m0_thread_shun();
212  mem_free(mthread);
213 #endif
214 
215  /* wait for the thread_join is called*/
216  thread_exit();
217 #ifndef __KERNEL__
218  return nr_failed_assertions;
219 #else
220  k_sys_return = *nr_failed_assertions;
221  m0_free(nr_failed_assertions);
222  return k_sys_return;
223 #endif
224 }
225 
227 {
228  int i;
229  int rc=0;
230  int nr_workers;
231  static int indices[ST_MAX_WORKER_NUM];
232 
233  nr_workers = st_get_nr_workers();
234  if (nr_workers == 0)
235  return -EINVAL;
236 
237  /* start a few test threads */
238  if (workers == NULL) {
239  MEM_ALLOC_ARR(workers, nr_workers);
240  if (workers == NULL)
241  return -ENOMEM;
242  }
243 
244  for (i = 0; i < nr_workers; i++) {
245  indices[i] = i;
247  indices+i, "Client_st_worker_%d", i);
248  if (rc < 0)
249  break;
250  }
251 
252  if (rc < 0)
253  mem_free(workers);
254 
255  return rc;
256 }
257 
259 {
260  int i;
261  int nr_workers;
262  int rc = 0; /* required */
263 
264  nr_workers = st_get_nr_workers();
265 
266  if (nr_workers == 0 || workers == NULL)
267  return rc;
268 
269  for (i = 0; i < nr_workers; i++) {
270  rc = thread_join(workers[i]);
272  }
273 
274  /*free workers*/
275  mem_free(workers);
276  return rc;
277 }
278 
279 /*
280  * Local variables:
281  * c-indentation-style: "K&R"
282  * c-basic-offset: 8
283  * tab-width: 8
284  * fill-column: 80
285  * scroll-step: 1
286  * End:
287  */
static int thread_join(st_thread_t *th)
Definition: st_worker.c:123
static m0_bindex_t indices[ZEROVEC_UT_SEGS_NR]
Definition: zerovec.c:38
pthread_t st_thread_t
Definition: st_worker.c:98
Definition: st.h:104
#define NULL
Definition: misc.h:38
static int thread_init(st_thread_t **ret_th, void *(*func)(void *), void *args,...)
Definition: st_worker.c:101
static void thread_fini(st_thread_t *th)
Definition: st_worker.c:136
int st_stop_workers(void)
Definition: st_worker.c:258
#define MEM_ALLOC_PTR(arr)
Definition: st_misc.h:81
int st_run(const char *test_list_str)
Definition: st.c:389
void thread_exit(void)
Definition: st_worker.c:118
M0_INTERNAL int m0_thread_adopt(struct m0_thread *thread, struct m0 *instance)
Definition: thread.c:127
Definition: ub.c:49
int st_set_worker_tid(int idx, pid_t tid)
Definition: st.c:108
int st_get_nr_workers(void)
Definition: st.c:103
int i
Definition: dir.c:1033
uint64_t sc_deadline
Definition: st.h:117
struct m0 * st_get_motr(void)
Definition: st.c:47
#define M0_ASSERT(cond)
void * m0_alloc(size_t size)
Definition: memory.c:126
static void mem_free(const struct m0_be_btree *btree, struct m0_be_tx *tx, void *ptr)
Definition: btree.c:102
M0_INTERNAL void m0_thread_shun(void)
Definition: thread.c:134
int sc_nr_rounds
Definition: st.h:116
uint64_t time_now(void)
Definition: st_misc.c:69
#define MEM_ALLOC_ARR(arr, nr)
Definition: st_misc.h:80
static void * mem_alloc(const struct m0_be_btree *btree, struct m0_be_tx *tx, m0_bcount_t size, uint64_t zonemask)
Definition: btree.c:116
#define M0_CLIENT_THREAD_ENTER
static int start(struct m0_fom *fom)
Definition: trigger_fom.c:321
const char * st_get_tests()
Definition: st.c:80
static void * st_worker(void *in)
Definition: st_worker.c:149
pid_t get_tid(void)
Definition: st_misc.c:150
struct st_cfg st_get_cfg()
Definition: st.c:62
void m0_free(void *data)
Definition: memory.c:146
int st_start_workers(void)
Definition: st_worker.c:226
int32_t rc
Definition: trigger_fop.h:47
static st_thread_t ** workers
Definition: st_worker.c:99