Motr  M0
queue.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
31 #include "lib/trace.h"
32 
33 #include "be/queue.h"
34 
35 #include "lib/memory.h" /* M0_ALLOC_PTR */
36 #include "lib/semaphore.h" /* m0_semaphore */
37 #include "lib/atomic.h" /* m0_atomic64 */
38 #include "lib/arith.h" /* m0_rnd64 */
39 #include "lib/misc.h" /* m0_reduce */
40 #include "lib/buf.h" /* m0_buf_eq */
41 
42 #include "ut/threads.h" /* m0_ut_theads_start */
43 #include "ut/ut.h" /* M0_UT_ASSERT */
44 
45 #include "be/tx_credit.h" /* M0_BE_TX_CREDIT */
46 #include "be/op.h" /* M0_BE_OP_SYNC */
47 
48 
62 };
63 
65  uint64_t butc_q_size_max;
66  uint64_t butc_producers;
67  uint64_t butc_consumers;
68  uint64_t butc_items_nr;
69  uint64_t butc_seed;
70 };
71 
73  uint64_t butr_put_before;
74  uint64_t butr_put_after;
75  uint64_t butr_get_before;
76  uint64_t butr_get_after;
78 };
79 
81  void *buqd_user;
84 };
85 
89  /* producer increments and takes butx_data[] with the index returned */
91  /* logical clock to check queue operations orderding */
95 };
96 
99  /*
100  * Start barrier to launch all threads as close to each other as
101  * possible.
102  */
105  /*
106  * Thread index, starts from 0 for producers and starts from 0 for
107  * consumers.
108  */
109  uint64_t butqp_index;
110  /* Number of items to put/get to/from the queue. */
111  uint64_t butqp_items_nr;
112  /* just for debugging purposes */
117 };
118 
119 #define BE_UT_QUEUE_TEST(q_size_max, producers, consumers, items_nr) \
120 { \
121  .butc_q_size_max = (q_size_max), \
122  .butc_producers = (producers), \
123  .butc_consumers = (consumers), \
124  .butc_items_nr = (items_nr) \
125 }
126 
128  [BE_UT_QUEUE_1_1_1] = BE_UT_QUEUE_TEST( 1, 1, 1, 1),
129  [BE_UT_QUEUE_2_1_1] = BE_UT_QUEUE_TEST( 2, 1, 1, 10000),
130  [BE_UT_QUEUE_100_1_1] = BE_UT_QUEUE_TEST(100, 1, 1, 10000),
131  [BE_UT_QUEUE_100_1_10] = BE_UT_QUEUE_TEST(100, 1, 10, 10000),
132  [BE_UT_QUEUE_100_10_1] = BE_UT_QUEUE_TEST(100, 10, 1, 10000),
133  [BE_UT_QUEUE_100_10_10] = BE_UT_QUEUE_TEST(100, 10, 10, 10000),
134  [BE_UT_QUEUE_10_100_1] = BE_UT_QUEUE_TEST( 10, 100, 1, 10000),
135  [BE_UT_QUEUE_10_100_5] = BE_UT_QUEUE_TEST( 10, 100, 5, 10000),
136  [BE_UT_QUEUE_10_1_100] = BE_UT_QUEUE_TEST( 10, 1, 100, 10000),
137  [BE_UT_QUEUE_10_5_100] = BE_UT_QUEUE_TEST( 10, 5, 100, 10000),
138  [BE_UT_QUEUE_10_100_100] = BE_UT_QUEUE_TEST( 10, 100, 100, 10000),
139 };
140 
141 #undef BE_UT_QUEUE_TEST
142 
144  struct be_ut_queue_data *data)
145 {
146  return (struct be_ut_queue_data *)data->buqd_user - ctx->butx_data;
147 }
148 
150  struct be_ut_queue_ctx *ctx)
151 {
152  struct be_ut_queue_data data;
153  struct m0_buf buf;
154  bool result;
155 
156  result = M0_BE_QUEUE_PEEK(ctx->butx_bq, &data);
157  if (result) {
158  ++param->butqp_peeks_successful;
159  buf = M0_BUF_INIT_PTR(&ctx->butx_data[
162  } else {
163  ++param->butqp_peeks_unsuccessful;
164  }
165 }
166 
167 static void be_ut_queue_thread(void *_param)
168 {
169  struct be_ut_queue_data data;
170  struct be_ut_queue_thread_param *param = _param;
171  struct be_ut_queue_ctx *ctx = param->butqp_ctx;
172  struct m0_be_queue *bq = ctx->butx_bq;
173  struct m0_be_op *op;
174  uint64_t i;
175  uint64_t index;
176  uint64_t before;
177  bool successful;
178 
179  M0_ALLOC_PTR(op);
180  M0_UT_ASSERT(op != NULL);
181  m0_be_op_init(op);
182  m0_semaphore_down(&param->butqp_sem_start);
183  for (i = 0; i < param->butqp_items_nr; ++i) {
185  if (param->butqp_is_producer) {
186  before = m0_atomic64_add_return(&ctx->butx_clock, 1);
187  m0_be_queue_lock(bq);
188  index = m0_atomic64_add_return(&ctx->butx_pos, 1) - 1;
190  M0_BE_QUEUE_PUT(bq, op, &ctx->butx_data[index]);
191  m0_be_queue_unlock(bq);
192  m0_be_op_wait(op);
193  ctx->butx_result[index].butr_put_before = before;
194  ctx->butx_result[index].butr_put_after =
195  m0_atomic64_add_return(&ctx->butx_clock, 1);
196  if (index == ctx->butx_cfg->butc_items_nr - 1) {
197  m0_be_queue_lock(bq);
198  m0_be_queue_end(bq);
199  m0_be_queue_unlock(bq);
200  }
201  } else {
202  M0_SET0(&data);
203  successful = param->butqp_index % 2 == 0;
204  before = m0_atomic64_add_return(&ctx->butx_clock, 1);
205  m0_be_queue_lock(bq);
206  M0_BE_QUEUE_GET(bq, op, &data, &successful);
208  m0_be_queue_unlock(bq);
209  m0_be_op_wait(op);
210  if (!successful) {
211  ++param->butqp_gets_unsuccessful;
212  continue;
213  }
214  M0_ASSERT(param->butqp_gets_unsuccessful == 0);
215  ++param->butqp_gets_successful;
217  M0_UT_ASSERT(!ctx->butx_result[index].butr_checked);
218  ctx->butx_result[index].butr_checked = true;
219  ctx->butx_result[index].butr_get_before = before;
220  ctx->butx_result[index].butr_get_after =
221  m0_atomic64_add_return(&ctx->butx_clock, 1);
222  }
223  }
224  m0_be_op_fini(op);
225  m0_free(op);
226 }
227 
235 static void be_ut_queue_with_cfg(struct be_ut_queue_cfg *test_cfg)
236 {
237  struct m0_ut_threads_descr *td;
239  struct m0_be_queue_cfg bq_cfg = {
240  .bqc_q_size_max = test_cfg->butc_q_size_max,
241  .bqc_producers_nr_max = test_cfg->butc_producers,
242  .bqc_consumers_nr_max = test_cfg->butc_consumers,
243  .bqc_item_length = sizeof(struct be_ut_queue_data),
244  };
245  struct be_ut_queue_ctx *ctx;
246  struct m0_be_queue *bq;
247  uint64_t threads_nr;
248  uint64_t items_nr = test_cfg->butc_items_nr;
249  uint64_t i;
250  uint64_t seed = test_cfg->butc_seed;
251  uint64_t divisor;
252  uint64_t remainder;
253  struct be_ut_queue_result *r;
254  int rc;
255 
256  M0_ALLOC_PTR(bq);
257  M0_ASSERT(bq != NULL);
258  rc = m0_be_queue_init(bq, &bq_cfg);
259  M0_ASSERT_INFO(rc == 0, "rc=%d", rc);
260  M0_ALLOC_PTR(ctx);
261  M0_UT_ASSERT(ctx != NULL);
262  ctx->butx_cfg = test_cfg;
263  ctx->butx_bq = bq;
264  m0_atomic64_set(&ctx->butx_pos, 0);
265  m0_atomic64_set(&ctx->butx_clock, 0);
266  M0_ALLOC_ARR(ctx->butx_data, items_nr);
267  M0_UT_ASSERT(ctx->butx_data != NULL);
268  M0_ALLOC_ARR(ctx->butx_result, items_nr);
269  M0_UT_ASSERT(ctx->butx_result != NULL);
270  for (i = 0; i < items_nr; ++i) {
271  ctx->butx_data[i] = (struct be_ut_queue_data){
272  .buqd_user = &ctx->butx_data[i],
273  .buqd_credit =
274  M0_BE_TX_CREDIT(m0_rnd64(&seed) % 0x100 + 1,
275  m0_rnd64(&seed) % 0x100 + 1),
276  .buqd_payload_size = m0_rnd64(&seed) % 0x1000 + 1,
277  };
278  }
279  threads_nr = test_cfg->butc_producers + test_cfg->butc_consumers;
280  M0_ALLOC_ARR(params, threads_nr);
281  for (i = 0; i < threads_nr; ++i) {
282  params[i].butqp_ctx = ctx;
283  m0_semaphore_init(&params[i].butqp_sem_start, 0);
284  params[i].butqp_is_producer = i < test_cfg->butc_producers;
285  if (params[i].butqp_is_producer) {
286  params[i].butqp_index = i;
287  divisor = test_cfg->butc_producers;
288  } else {
289  params[i].butqp_index = i - test_cfg->butc_producers;
290  divisor = test_cfg->butc_consumers;
291  }
292  remainder = items_nr % divisor;
293  params[i].butqp_items_nr = items_nr / divisor +
294  (remainder == 0 ?
295  0 : params[i].butqp_index < remainder) +
296  !params[i].butqp_is_producer;
297  }
298  /* all producer threads would put items_nr items to the queue */
299  M0_UT_ASSERT(m0_reduce(j, test_cfg->butc_producers,
300  0, + params[j].butqp_items_nr) == items_nr);
301  /*
302  * All consumer threads would try to get items_nr + butc_consumers items
303  * from the queue. Only items_nr M0_BE_QUEUE_GET()s would be
304  * successful, the rest would be unsuccessful.
305  */
306  M0_UT_ASSERT(m0_reduce(j, test_cfg->butc_consumers,
307  0, + params[test_cfg->butc_producers +
308  j].butqp_items_nr) ==
309  items_nr + test_cfg->butc_consumers);
310 
311  M0_ALLOC_PTR(td);
312  M0_UT_ASSERT(td != NULL);
314  m0_ut_threads_start(td, threads_nr, params, sizeof(params[0]));
315  for (i = 0; i < threads_nr; ++i)
316  m0_semaphore_up(&params[i].butqp_sem_start);
317  /* work is done sometime around here */
318  m0_ut_threads_stop(td);
319  m0_free(td);
320 
321  for (i = 0; i < threads_nr; ++i)
322  m0_semaphore_fini(&params[i].butqp_sem_start);
323 
324  r = ctx->butx_result;
325  /* M0_BE_QUEUE_GET() has been called successfully for items_nr items */
326  M0_UT_ASSERT(m0_reduce(j, test_cfg->butc_consumers,
327  0, + params[test_cfg->butc_producers +
328  j].butqp_gets_successful) ==
329  items_nr);
330  /*
331  * There has been exactly butc_consumers unsuccessful M0_BE_QUEUE_GET()
332  * calls.
333  */
334  M0_UT_ASSERT(m0_reduce(j, test_cfg->butc_consumers,
335  0, + params[test_cfg->butc_producers +
336  j].butqp_gets_unsuccessful) ==
337  test_cfg->butc_consumers);
338  /* that each item is returned by m0_be_queue_get() exactly once */
339  M0_UT_ASSERT(m0_forall(j, items_nr, r[j].butr_checked));
340  /* at least one m0_be_queue_peek() is supposed to fail in each thread */
342  test_cfg->butc_producers +
343  test_cfg->butc_consumers,
344  params[j].butqp_peeks_unsuccessful > 0));
345  /* happened-before relations */
346  M0_UT_ASSERT(m0_forall(j, items_nr,
347  r[j].butr_put_before < r[j].butr_get_after));
348  M0_UT_ASSERT(m0_forall(j, items_nr - 1,
349  r[j].butr_put_before < r[j + 1].butr_put_after));
350  M0_UT_ASSERT(m0_forall(j, items_nr - 1,
351  r[j].butr_get_before < r[j + 1].butr_get_after));
352 
353  m0_free(params);
354  m0_free(ctx->butx_result);
355  m0_free(ctx->butx_data);
356  m0_free(ctx);
357 
358  m0_be_queue_fini(bq);
359  m0_free(bq);
360 }
361 
363 {
366 }
367 
379 
381 {
382  const int MAX = 10;
383  struct be_ut_queue_cfg *test_cfg;
384  int i;
385  int j;
386  int k;
387 
388  M0_ALLOC_PTR(test_cfg);
389  for (i = 1; i <= MAX; ++i)
390  for (j = 1; j <= MAX; ++j)
391  for (k = 1; k <= MAX; ++k) {
392  *test_cfg = (struct be_ut_queue_cfg){
393  .butc_q_size_max = i,
394  .butc_producers = j,
395  .butc_consumers = k,
396  .butc_items_nr = 100,
397  .butc_seed = i * 100 + j * 10 + k,
398  };
399  be_ut_queue_with_cfg(test_cfg);
400  }
401  m0_free(test_cfg);
402 }
403 
404 
405 #undef M0_TRACE_SUBSYSTEM
406 
409 /*
410  * Local variables:
411  * c-indentation-style: "K&R"
412  * c-basic-offset: 8
413  * tab-width: 8
414  * fill-column: 80
415  * scroll-step: 1
416  * End:
417  */
418 /*
419  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
420  */
be_ut_queue_test
Definition: queue.c:49
M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
Definition: queue.c:195
static void be_ut_queue(enum be_ut_queue_test test)
Definition: queue.c:362
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static void be_ut_queue_thread(void *_param)
Definition: queue.c:167
void m0_be_ut_queue_10_100_100(void)
Definition: queue.c:378
#define M0_BE_QUEUE_PUT(bq, op, ptr)
Definition: queue.h:236
#define NULL
Definition: misc.h:38
uint64_t butqp_gets_successful
Definition: queue.c:115
void m0_be_ut_queue_from_1_to_10(void)
Definition: queue.c:380
M0_INTERNAL bool m0_buf_eq(const struct m0_buf *x, const struct m0_buf *y)
Definition: buf.c:90
#define M0_BE_QUEUE_GET(bq, op, ptr, successful)
Definition: queue.h:238
void m0_be_ut_queue_100_1_10(void)
Definition: queue.c:371
struct m0_semaphore butqp_sem_start
Definition: queue.c:103
struct m0_bufvec data
Definition: di.c:40
uint64_t butr_get_before
Definition: queue.c:75
struct be_ut_queue_result * butx_result
Definition: queue.c:94
#define m0_exists(var, nr,...)
Definition: misc.h:134
void m0_be_ut_queue_1_1_1(void)
Definition: queue.c:368
uint64_t m0_bcount_t
Definition: types.h:77
m0_bcount_t buqd_payload_size
Definition: queue.c:83
struct be_ut_queue_cfg * butx_cfg
Definition: queue.c:87
#define M0_SET0(obj)
Definition: misc.h:64
bool butr_checked
Definition: queue.c:77
void m0_be_ut_queue_100_10_10(void)
Definition: queue.c:373
struct tpool_test test[]
Definition: thread_pool.c:45
Definition: sock.c:887
static struct be_ut_queue_cfg be_ut_queue_tests_cfg[BE_UT_QUEUE_NR]
Definition: queue.c:127
struct m0_be_queue * butx_bq
Definition: queue.c:88
op
Definition: libdemo.c:64
#define M0_BE_TX_CREDIT(nr, size)
Definition: tx_credit.h:94
Definition: buf.h:37
M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq, struct m0_be_queue_cfg *cfg)
Definition: queue.c:94
int i
Definition: dir.c:1033
struct m0_atomic64 butx_clock
Definition: queue.c:92
static struct nlx_ping_client_params * params
uint64_t butc_producers
Definition: queue.c:66
void m0_be_ut_queue_10_5_100(void)
Definition: queue.c:377
uint64_t butqp_peeks_successful
Definition: queue.c:113
uint64_t bqc_q_size_max
Definition: queue.h:125
M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
Definition: queue.c:148
uint64_t butqp_gets_unsuccessful
Definition: queue.c:116
uint64_t butr_put_after
Definition: queue.c:74
#define M0_ASSERT(cond)
void * buqd_user
Definition: queue.c:81
static void be_ut_queue_with_cfg(struct be_ut_queue_cfg *test_cfg)
Definition: queue.c:235
M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
Definition: queue.c:190
static uint64_t be_ut_queue_data_index(struct be_ut_queue_ctx *ctx, struct be_ut_queue_data *data)
Definition: queue.c:143
uint64_t butqp_items_nr
Definition: queue.c:111
M0_INTERNAL void m0_ut_threads_stop(struct m0_ut_threads_descr *descr)
Definition: threads.c:52
struct be_ut_queue_ctx * butqp_ctx
Definition: queue.c:98
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
uint64_t butr_get_after
Definition: queue.c:76
void m0_be_ut_queue_2_1_1(void)
Definition: queue.c:369
static struct fdmi_ctx ctx
Definition: main.c:80
void(* utd_thread_func)(void *param)
Definition: threads.h:41
void m0_be_ut_queue_10_100_5(void)
Definition: queue.c:375
#define m0_forall(var, nr,...)
Definition: misc.h:112
void m0_be_ut_queue_10_1_100(void)
Definition: queue.c:376
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
Definition: op.c:152
#define M0_BUF_INIT_PTR(p)
Definition: buf.h:69
M0_INTERNAL uint64_t m0_rnd64(uint64_t *seed)
Definition: misc.c:100
uint64_t butr_put_before
Definition: queue.c:73
uint64_t butc_consumers
Definition: queue.c:67
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
M0_INTERNAL void m0_ut_threads_start(struct m0_ut_threads_descr *descr, int thread_nr, void *param_array, size_t param_size)
Definition: threads.c:28
Definition: list.c:42
static int r[NR]
Definition: thread.c:46
struct m0_atomic64 butx_pos
Definition: queue.c:90
Definition: common.h:34
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
Definition: stubs.c:92
#define M0_BE_QUEUE_PEEK(bq, ptr)
Definition: queue.h:240
#define BE_UT_QUEUE_TEST(q_size_max, producers, consumers, items_nr)
Definition: queue.c:119
uint64_t butc_items_nr
Definition: queue.c:68
#define M0_ASSERT_INFO(cond, fmt,...)
void m0_be_ut_queue_10_100_1(void)
Definition: queue.c:374
uint64_t butc_seed
Definition: queue.c:69
Definition: libdemo.c:64
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
Definition: nucleus.c:42
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
uint64_t butqp_index
Definition: queue.c:109
Definition: op.h:74
struct m0_be_tx_credit buqd_credit
Definition: queue.c:82
M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
Definition: queue.c:377
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
Definition: stubs.c:87
void m0_free(void *data)
Definition: memory.c:146
uint64_t butc_q_size_max
Definition: queue.c:65
void m0_be_ut_queue_100_1_1(void)
Definition: queue.c:370
int32_t rc
Definition: trigger_fop.h:47
#define M0_UT_ASSERT(a)
Definition: ut.h:46
struct be_ut_queue_data * butx_data
Definition: queue.c:93
void m0_be_ut_queue_100_10_1(void)
Definition: queue.c:372
uint64_t butqp_peeks_unsuccessful
Definition: queue.c:114
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
M0_INTERNAL void m0_be_op_wait(struct m0_be_op *op)
Definition: stubs.c:96
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
static void be_ut_queue_try_peek(struct be_ut_queue_thread_param *param, struct be_ut_queue_ctx *ctx)
Definition: queue.c:149