Motr  M0
tx_bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
33 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
34 #include "lib/trace.h"
35 
36 #include "be/tx_bulk.h"
37 
38 #include "lib/mutex.h" /* m0_mutex */
39 #include "lib/memory.h" /* M0_ALLOC_PTR */
40 #include "lib/errno.h" /* ENOENT */
41 #include "lib/atomic.h" /* m0_atomic64 */
42 
43 #include "be/ut/helper.h" /* m0_be_ut_backend_init */
44 #include "be/op.h" /* m0_be_op */
45 #include "be/queue.h" /* m0_be_queue */
46 
47 #include "ut/ut.h" /* M0_UT_ASSERT */
48 #include "ut/threads.h" /* m0_ut_threads_start */
49 
50 enum {
55 };
56 
60 };
61 
65 };
66 
67 static void be_ut_tx_bulk_test_init(struct be_ut_tx_bulk_be_ctx **be_ctx_out,
68  struct be_ut_tx_bulk_be_cfg *be_cfg,
69  void (*test_prepare)
70  (struct m0_be_ut_backend *ut_be,
71  struct m0_be_ut_seg *ut_seg,
72  void *ptr),
73  void *ptr)
74 {
75  struct be_ut_tx_bulk_be_ctx *be_ctx;
76  struct m0_be_domain_cfg cfg = {};
77  int rc;
78 
79  M0_ALLOC_PTR(be_ctx);
80  M0_UT_ASSERT(be_ctx != NULL);
81 
82  M0_ALLOC_PTR(be_ctx->tbbx_ut_be);
83  M0_UT_ASSERT(be_ctx->tbbx_ut_be != NULL);
84  M0_ALLOC_PTR(be_ctx->tbbx_ut_seg);
85  M0_UT_ASSERT(be_ctx->tbbx_ut_seg != NULL);
86  /*
87  * Decrease max group and tx size to reduce seg and log I/O size needed
88  * for tx_bulk UTs.
89  */
91  if (be_cfg != NULL) {
92  if (be_cfg->tbbc_tx_group_nr != 0) {
96  be_cfg->tbbc_tx_group_nr);
97  }
98  if (be_cfg->tbbc_tx_nr_max != 0) {
100  be_cfg->tbbc_tx_nr_max;
101  }
102  }
107  rc = m0_be_ut_backend_init_cfg(be_ctx->tbbx_ut_be, &cfg, true);
108  M0_UT_ASSERT(rc == 0);
109  m0_be_ut_seg_init(be_ctx->tbbx_ut_seg, be_ctx->tbbx_ut_be,
111 
112  test_prepare(be_ctx->tbbx_ut_be, be_ctx->tbbx_ut_seg, ptr);
113 
114  *be_ctx_out = be_ctx;
115 }
116 
118 {
120  m0_free(be_ctx->tbbx_ut_seg);
122  m0_free(be_ctx->tbbx_ut_be);
123  m0_free(be_ctx);
124 }
125 
126 static void be_ut_tx_bulk_test_run(struct be_ut_tx_bulk_be_ctx *be_ctx,
127  struct m0_be_tx_bulk_cfg *tb_cfg,
128  void (*test_work_put)
129  (struct m0_be_tx_bulk *tb,
130  bool success,
131  void *ptr),
132  void *ptr,
133  bool success)
134 {
135  struct m0_be_tx_bulk *tb;
136  struct m0_be_op *op;
137  int rc;
138 
139  M0_ALLOC_PTR(tb);
140  M0_UT_ASSERT(tb != NULL);
141 
142  tb_cfg->tbc_dom = &be_ctx->tbbx_ut_be->but_dom;
144  M0_UT_ASSERT(rc == 0);
145 
146  M0_ALLOC_PTR(op);
147  M0_UT_ASSERT(op != NULL);
148  m0_be_op_init(op);
149 
150  m0_be_tx_bulk_run(tb, op);
151  test_work_put(tb, success, ptr);
152  m0_be_op_wait(op);
153 
154  m0_be_op_fini(op);
155  m0_free(op);
156 
157  rc = m0_be_tx_bulk_status(tb);
158  M0_UT_ASSERT(equi(rc == 0, success));
159  m0_be_tx_bulk_fini(tb);
160 
161  m0_free(tb);
162 }
163 
164 
165 enum {
167 };
168 
171  void *tbu_pos;
172  void *tbu_end;
173 };
174 
176  bool success,
177  void *ptr)
178 {
179  struct be_ut_tx_bulk_usecase *bu = ptr;
180 
181  M0_UT_ASSERT(success);
182  while (bu->tbu_pos + sizeof(uint64_t) < bu->tbu_end) {
184  m0_be_tx_bulk_put(tb, &op,
185  &M0_BE_TX_CREDIT_TYPE(uint64_t),
186  0, 0, bu->tbu_pos));
187  bu->tbu_pos += sizeof(uint64_t);
188  }
189  m0_be_tx_bulk_end(tb);
190 }
191 
193  struct m0_be_tx *tx,
194  struct m0_be_op *op,
195  void *datum,
196  void *user,
197  uint64_t worker_index,
198  uint64_t partition)
199 {
200  struct be_ut_tx_bulk_usecase *bu = datum;
201  uint64_t *value = user;
202 
204  *value = (uint64_t)value;
206  m0_be_op_done(op);
207 }
208 
210  void *datum,
211  void *user,
212  uint64_t worker_index,
213  uint64_t partition)
214 {
215  /* XXX */
216 }
217 
219  struct m0_be_ut_seg *ut_seg,
220  void *ptr)
221 {
222  struct be_ut_tx_bulk_usecase *bu = ptr;
223 
224  bu->tbu_seg = ut_seg->bus_seg;
227  M0_UT_ASSERT(bu->tbu_pos != NULL);
229 }
230 
232 {
233  struct be_ut_tx_bulk_usecase *bu;
234  struct be_ut_tx_bulk_be_ctx *be_ctx;
235  struct m0_be_tx_bulk_cfg tb_cfg = {
236  .tbc_q_cfg = {
237  .bqc_q_size_max = BE_UT_TX_BULK_Q_SIZE_MAX,
238  .bqc_producers_nr_max = 1,
239  },
240  .tbc_workers_nr = BE_UT_TX_BULK_WORKERS,
241  .tbc_partitions_nr = 1,
242  .tbc_work_items_per_tx_max = 3,
243  .tbc_do = &be_ut_tx_bulk_usecase_do,
244  .tbc_done = &be_ut_tx_bulk_usecase_done,
245  };
246 
247  M0_ALLOC_PTR(bu);
248  M0_UT_ASSERT(bu != NULL);
249  tb_cfg.tbc_datum = bu;
250  be_ut_tx_bulk_test_init(&be_ctx, NULL,
253  bu, true);
254  be_ut_tx_bulk_test_fini(be_ctx);
255  m0_free(bu);
256 }
257 
258 enum {
270 };
271 
280 
281  uint64_t bbs_nr_max;
283  unsigned bbs_cred_bp;
287  unsigned bbs_use_bp;
290 
294  uint32_t bbs_buf_nr;
296  void **bbs_buf;
298 };
299 
301  bool calc_cred,
302  struct m0_be_tx_credit *cred,
303  m0_bcount_t *cred_payload)
304 {
305  struct m0_be_tx_credit cred_v;
306  m0_bcount_t payload_v;
307  unsigned cred_bp;
308  unsigned payload_bp;
309 
310  cred_v = calc_cred ? tbs->bbs_cred : tbs->bbs_use;
311  cred_bp = calc_cred ? tbs->bbs_cred_bp : tbs->bbs_use_bp;
312  payload_v = calc_cred ? tbs->bbs_payload_cred : tbs->bbs_payload_use;
313  payload_bp = calc_cred ? tbs->bbs_payload_cred_bp :
314  tbs->bbs_payload_use_bp;
315 
316  *cred = tbs->bbs_cred_max;
317  *cred_payload = tbs->bbs_payload_max;
318  m0_be_tx_credit_mul_bp(cred, cred_bp);
319  *cred_payload = (*cred_payload * payload_bp) / 10000;
320  m0_be_tx_credit_add(cred, &cred_v);
321  *cred_payload += payload_v;
322 }
323 
325  bool success,
326  void *ptr)
327 {
328  struct be_ut_tx_bulk_state *tbs = ptr;
329  struct m0_be_tx_credit cred;
330  m0_bcount_t cred_payload;
331  uint64_t i;
332  bool put_successful = true;
333 
334  for (i = 0; i < tbs->bbs_nr_max; ++i) {
335  be_ut_tx_bulk_state_calc(tbs, true, &cred, &cred_payload);
336  M0_LOG(M0_DEBUG, "%d "BETXCR_F, (int)i, BETXCR_P(&cred));
337  M0_BE_OP_SYNC(op, put_successful =
338  m0_be_tx_bulk_put(tb, &op, &cred, cred_payload, 0,
339  (void *)i));
340  if (!put_successful)
341  break;
342  }
343  M0_UT_ASSERT(equi(success, put_successful));
344  m0_be_tx_bulk_end(tb);
345 }
346 
347 static void be_ut_tx_bulk_state_do(struct m0_be_tx_bulk *tb,
348  struct m0_be_tx *tx,
349  struct m0_be_op *op,
350  void *datum,
351  void *user,
352  uint64_t worker_index,
353  uint64_t partition)
354 {
355  struct be_ut_tx_bulk_state *tbs = datum;
356  struct m0_be_tx_credit use;
357  struct m0_buf buf;
359  m0_bcount_t use_payload;
360  uint64_t counter;
361  uint64_t start = (uint64_t)user;
362  uint64_t i;
363 
365  M0_UT_ASSERT(counter == 1);
367  be_ut_tx_bulk_state_calc(tbs, false, &use, &use_payload);
368  left = use.tc_reg_size;
369  for (i = start; i < start + tbs->bbs_buf_nr; ++i) {
370  if (left == 0)
371  break;
372  buf.b_nob = min_check(left, tbs->bbs_buf_size);
373  buf.b_addr = tbs->bbs_buf[i % tbs->bbs_buf_nr];
374  left -= buf.b_nob;
375  M0_BE_TX_CAPTURE_BUF(tbs->bbs_seg, tx, &buf);
376  }
377  M0_UT_ASSERT(left == 0);
378  tx->t_payload.b_nob = use_payload;
379  memset(tx->t_payload.b_addr, 0, tx->t_payload.b_nob);
380  m0_be_op_done(op);
382  M0_UT_ASSERT(counter == 2);
383 }
384 
386  void *datum,
387  void *user,
388  uint64_t worker_index,
389  uint64_t partition)
390 {
391  struct be_ut_tx_bulk_state *tbs = datum;
392  uint64_t start = (uint64_t)user;
393  uint64_t counter;
394 
396  M0_UT_ASSERT(counter == 3);
397 }
398 
400  struct m0_be_ut_seg *ut_seg,
401  void *ptr)
402 {
403  struct be_ut_tx_bulk_state *tbs = ptr;
404  uint32_t i;
405 
406  tbs->bbs_seg = ut_seg->bus_seg;
408  &tbs->bbs_payload_max);
409  for (i = 0; i < tbs->bbs_buf_nr; ++i) {
411  tbs->bbs_buf_size);
412  M0_UT_ASSERT(tbs->bbs_buf[i] != NULL);
413  }
414 }
415 
416 enum {
422 };
423 
425  struct be_ut_tx_bulk_be_cfg *be_cfg,
426  bool success)
427 {
428  struct be_ut_tx_bulk_be_ctx *be_ctx;
429  struct m0_be_tx_bulk_cfg tb_cfg = {
430  .tbc_q_cfg = {
431  .bqc_q_size_max = BE_UT_TX_BULK_Q_SIZE_MAX,
432  .bqc_producers_nr_max = 1,
433  },
434  .tbc_workers_nr = BE_UT_TX_BULK_WORKERS,
435  .tbc_partitions_nr = 1,
436  .tbc_work_items_per_tx_max = 1,
437  .tbc_do = &be_ut_tx_bulk_state_do,
438  .tbc_done = &be_ut_tx_bulk_state_done,
439  };
440 
441  tb_cfg.tbc_datum = tbs;
444  M0_ALLOC_ARR(tbs->bbs_buf, tbs->bbs_buf_nr);
445  M0_UT_ASSERT(tbs->bbs_buf != NULL);
448 
449  be_ut_tx_bulk_test_init(&be_ctx, NULL,
452  tbs, success);
453  be_ut_tx_bulk_test_fini(be_ctx);
454 
455  M0_UT_ASSERT(ergo(success, m0_forall(i, tbs->bbs_nr_max,
456  m0_atomic64_get(&tbs->bbs_callback_counter[i]) == 3)));
457  M0_UT_ASSERT(ergo(!success, m0_forall(i, tbs->bbs_nr_max,
458  m0_atomic64_get(&tbs->bbs_callback_counter[i]) == 0)));
460  m0_free(tbs->bbs_buf);
461 }
462 
464 {
466  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_EMPTY,
467  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
468  .bbs_cred_bp = 0,
469  .bbs_payload_cred = 0,
470  .bbs_payload_cred_bp = 0,
471  .bbs_use = M0_BE_TX_CREDIT(0, 0),
472  .bbs_use_bp = 0,
473  .bbs_payload_use = 0,
474  .bbs_payload_use_bp = 0,
475  }), NULL, true);
476 }
477 
479 {
481  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_ERROR,
482  .bbs_cred = M0_BE_TX_CREDIT(0, 1),
483  .bbs_cred_bp = 10000,
484  .bbs_payload_cred = 0,
485  .bbs_payload_cred_bp = 0,
486  .bbs_use = M0_BE_TX_CREDIT(0, 0),
487  .bbs_use_bp = 0,
488  .bbs_payload_use = 0,
489  .bbs_payload_use_bp = 0,
490  }), NULL, false);
491 }
492 
494 {
496  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_ERROR,
497  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
498  .bbs_cred_bp = 0,
499  .bbs_payload_cred = 1,
500  .bbs_payload_cred_bp = 10000,
501  .bbs_use = M0_BE_TX_CREDIT(0, 0),
502  .bbs_use_bp = 0,
503  .bbs_payload_use = 0,
504  .bbs_payload_use_bp = 0,
505  }), NULL, false);
506 }
507 
509 {
511  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_LARGE_TX,
512  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
513  .bbs_cred_bp = 10000,
514  .bbs_payload_cred = 0,
515  .bbs_payload_cred_bp = 0,
516  .bbs_use = M0_BE_TX_CREDIT(0, 0),
517  .bbs_use_bp = 10000,
518  .bbs_payload_use = 0,
519  .bbs_payload_use_bp = 0,
520  }), NULL, true);
521 }
522 
524 {
526  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_LARGE_PAYLOAD,
527  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
528  .bbs_cred_bp = 0,
529  .bbs_payload_cred = 0,
530  .bbs_payload_cred_bp = 10000,
531  .bbs_use = M0_BE_TX_CREDIT(0, 0),
532  .bbs_use_bp = 0,
533  .bbs_payload_use = 0,
534  .bbs_payload_use_bp = 10000,
535  }), NULL, true);
536 }
537 
539 {
541  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_LARGE_ALL,
542  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
543  .bbs_cred_bp = 10000,
544  .bbs_payload_cred = 0,
545  .bbs_payload_cred_bp = 10000,
546  .bbs_use = M0_BE_TX_CREDIT(0, 0),
547  .bbs_use_bp = 10000,
548  .bbs_payload_use = 0,
549  .bbs_payload_use_bp = 10000,
550  }), NULL, true);
551 }
552 
554 {
556  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_SMALL_TX,
557  .bbs_cred = M0_BE_TX_CREDIT(0x8, 0x8),
558  .bbs_cred_bp = 0,
559  .bbs_payload_cred = 0x8,
560  .bbs_payload_cred_bp = 0,
561  .bbs_use = M0_BE_TX_CREDIT(0x8, 0x8),
562  .bbs_use_bp = 0,
563  .bbs_payload_use = 0x8,
564  .bbs_payload_use_bp = 0,
565  }), NULL, true);
566 }
567 
569 {
571  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_MEDIUM_TX,
572  .bbs_cred = M0_BE_TX_CREDIT(1, 1),
573  .bbs_cred_bp = 10,
574  .bbs_payload_cred = 1,
575  .bbs_payload_cred_bp = 5,
576  .bbs_use = M0_BE_TX_CREDIT(1, 1),
577  .bbs_use_bp = 10,
578  .bbs_payload_use = 1,
579  .bbs_payload_use_bp = 5,
580  }), NULL, true);
581 }
582 
583 /* m0_be_ut_tx_bulk_medium_tx with 8 tx_groups */
585 {
587  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_MEDIUM_TX,
588  .bbs_cred = M0_BE_TX_CREDIT(1, 1),
589  .bbs_cred_bp = 10,
590  .bbs_payload_cred = 1,
591  .bbs_payload_cred_bp = 5,
592  .bbs_use = M0_BE_TX_CREDIT(1, 1),
593  .bbs_use_bp = 10,
594  .bbs_payload_use = 1,
595  .bbs_payload_use_bp = 5,
596  }),
597  &((struct be_ut_tx_bulk_be_cfg){
598  .tbbc_tx_group_nr = 8,
599  }), true);
600 }
601 
603 {
605  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_MEDIUM_CRED,
606  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
607  .bbs_cred_bp = 100,
608  .bbs_payload_cred = 0,
609  .bbs_payload_cred_bp = 80,
610  .bbs_use = M0_BE_TX_CREDIT(0, 0),
611  .bbs_use_bp = 10,
612  .bbs_payload_use = 0,
613  .bbs_payload_use_bp = 2,
614  }), NULL, true);
615 }
616 
618 {
620  .bbs_nr_max = BE_UT_TX_BULK_TX_NR_LARGE_CRED,
621  .bbs_cred = M0_BE_TX_CREDIT(0, 0),
622  .bbs_cred_bp = 1000,
623  .bbs_payload_cred = 0,
624  .bbs_payload_cred_bp = 500,
625  .bbs_use = M0_BE_TX_CREDIT(0, 0),
626  .bbs_use_bp = 10,
627  .bbs_payload_use = 0,
628  .bbs_payload_use_bp = 2,
629  }), NULL, true);
630 }
631 
632 enum {
641 };
642 
645  void *bubc_start;
647 };
648 
651 };
652 
655  uint64_t bubw_q_size_max;
657  uint64_t bubw_workers_nr;
662 };
663 
665  struct m0_be_ut_seg *ut_seg,
666  void *ptr)
667 {
669 
670  m0_be_ut_alloc(ut_be, ut_seg, &ctx->bubc_start,
672  M0_UT_ASSERT(ctx->bubc_start != NULL);
673  m0_atomic64_set(&ctx->bubc_counter, 0);
674 }
675 
677  bool success,
678  void *ptr)
679 {
681  uint64_t i;
682  uint64_t j;
683  void *addr;
684  bool successful;
685 
686  for (i = 0; i < wi->bubw_work_items_per_partition; ++i) {
687  for (j = 0; j < wi->bubw_partitions_nr; ++j) {
688  addr = wi->bubw_ctx->bubc_start +
690  &wi->bubw_ctx->bubc_counter, 1) %
692  sizeof(uint64_t)));
693  M0_BE_OP_SYNC(op, successful =
694  m0_be_tx_bulk_put(tb, &op,
695  &M0_BE_TX_CREDIT_TYPE(uint64_t),
696  0, j, addr));
697  M0_UT_ASSERT(successful);
698  }
699  }
700  m0_be_tx_bulk_end(tb);
701 }
702 
704  struct m0_be_tx *tx,
705  struct m0_be_op *op,
706  void *datum,
707  void *user,
708  uint64_t worker_index,
709  uint64_t partition)
710 {
711  struct be_ut_tx_bulk_parallel_work_item *wi = datum;
712  uint64_t *value = user;
713 
715  *value = (uint64_t)value;
717  tx, value);
719  m0_be_op_done(op);
720 }
721 
723  void *datum,
724  void *user,
725  uint64_t worker_index,
726  uint64_t partition)
727 {
728  struct be_ut_tx_bulk_parallel_work_item *wi = datum;
729 
731 }
732 
733 static void be_ut_tx_bulk_parallel_thread(void *_param)
734 {
737  struct m0_be_tx_bulk_cfg *tb_cfg;
738  struct m0_be_op *op;
739  bool successful;
740 
743  M0_ALLOC_PTR(op);
744  M0_UT_ASSERT(op != NULL);
745  m0_be_op_init(op);
746  M0_ALLOC_PTR(wi);
747  M0_UT_ASSERT(wi != NULL);
748  while (1) {
749  m0_be_queue_lock(param->bubp_bq);
750  M0_BE_QUEUE_GET(param->bubp_bq, op, wi, &successful);
751  m0_be_queue_unlock(param->bubp_bq);
752  m0_be_op_wait(op);
753  if (!successful) {
754  break;
755  }
757  *tb_cfg = (struct m0_be_tx_bulk_cfg) {
758  .tbc_q_cfg = {
759  .bqc_q_size_max = wi->bubw_q_size_max,
760  .bqc_producers_nr_max = 1,
761  },
762  .tbc_workers_nr = wi->bubw_workers_nr,
763  .tbc_partitions_nr = wi->bubw_partitions_nr,
764  .tbc_work_items_per_tx_max =
766  .tbc_datum = wi,
767  .tbc_do = &be_ut_tx_bulk_parallel_do,
768  .tbc_done =
770  };
775  wi, true);
779  wi->bubw_partitions_nr *
781  }
782  m0_free(wi);
783  m0_be_op_fini(op);
784  m0_free(op);
785  m0_free(tb_cfg);
786 }
787 
789 {
793  struct m0_ut_threads_descr *td;
794  struct m0_be_queue *bq;
795  struct m0_be_op *op;
796  uint64_t partitions_nr;
797  uint64_t workers_nr;
798  uint64_t i;
799  int rc;
800 
801  M0_ALLOC_PTR(bq);
802  M0_UT_ASSERT(bq != NULL);
803  rc = m0_be_queue_init(bq, &(struct m0_be_queue_cfg){
804  .bqc_q_size_max = BE_UT_TX_BULK_PARALLEL_QUEUE_SIZE_MAX,
805  .bqc_producers_nr_max = 1,
806  .bqc_consumers_nr_max = BE_UT_TX_BULK_PARALLEL_THREADS_NR,
807  .bqc_item_length = sizeof(*wi),
808  });
809  M0_UT_ASSERT(rc == 0);
810 
811  M0_ALLOC_PTR(ctx);
812  M0_UT_ASSERT(ctx != NULL);
813  be_ut_tx_bulk_test_init(&ctx->bubc_be_ctx,
814  &((struct be_ut_tx_bulk_be_cfg){
815  .tbbc_tx_group_nr = 4,
816  .tbbc_tx_nr_max = 32,
817  }),
819 
822  for (i = 0; i < BE_UT_TX_BULK_PARALLEL_THREADS_NR; ++i) {
824  .bubp_bq = bq,
825  };
826  }
827 
828  M0_ALLOC_PTR(td);
829  M0_UT_ASSERT(td != NULL);
832  params, sizeof(params[0]));
833 
834  M0_ALLOC_PTR(op);
835  M0_UT_ASSERT(op != NULL);
836  m0_be_op_init(op);
837  M0_ALLOC_PTR(wi);
838  M0_UT_ASSERT(wi != NULL);
841  for (partitions_nr = 1;
843  ++partitions_nr) {
844  for (workers_nr = partitions_nr;
846  ++workers_nr) {
847  *wi = (struct be_ut_tx_bulk_parallel_work_item){
848  .bubw_ctx = ctx,
849  .bubw_q_size_max =
851  .bubw_partitions_nr = partitions_nr,
852  .bubw_workers_nr = workers_nr,
853  .bubw_work_items_per_partition =
855  .bubw_work_items_per_tx_max =
857  };
858  m0_be_queue_lock(bq);
859  M0_BE_QUEUE_PUT(bq, op, wi);
860  m0_be_queue_unlock(bq);
861  m0_be_op_wait(op);
863  }
864  }
865  m0_be_op_fini(op);
866  m0_free(op);
867  m0_free(wi);
868  m0_be_queue_lock(bq);
869  m0_be_queue_end(bq);
870  m0_be_queue_unlock(bq);
871 
872  m0_ut_threads_stop(td);
873  m0_free(td);
874 
875  m0_free(params);
876  be_ut_tx_bulk_test_fini(ctx->bubc_be_ctx);
877  m0_free(ctx);
878  m0_be_queue_fini(bq);
879  m0_free(bq);
880 
881 }
882 
883 #undef M0_TRACE_SUBSYSTEM
884 
887 /*
888  * Local variables:
889  * c-indentation-style: "K&R"
890  * c-basic-offset: 8
891  * tab-width: 8
892  * fill-column: 80
893  * scroll-step: 1
894  * End:
895  */
896 /*
897  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
898  */
static void be_ut_tx_bulk_state_do(struct m0_be_tx_bulk *tb, struct m0_be_tx *tx, struct m0_be_op *op, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:347
static void be_ut_tx_bulk_state_test_prepare(struct m0_be_ut_backend *ut_be, struct m0_be_ut_seg *ut_seg, void *ptr)
Definition: tx_bulk.c:399
static void m0_atomic64_inc(struct m0_atomic64 *a)
void m0_be_ut_seg_fini(struct m0_be_ut_seg *ut_seg)
Definition: stubs.c:267
static void ptr(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:440
struct m0_be_seg * tbu_seg
Definition: tx_bulk.c:170
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL void m0_be_tx_bulk_run(struct m0_be_tx_bulk *tb, struct m0_be_op *op)
Definition: tx_bulk.c:565
M0_INTERNAL int m0_be_tx_bulk_init(struct m0_be_tx_bulk *tb, struct m0_be_tx_bulk_cfg *tb_cfg)
Definition: tx_bulk.c:136
void m0_be_ut_tx_bulk_usecase(void)
Definition: tx_bulk.c:231
struct m0_be_tx_credit bbs_cred
Definition: tx_bulk.c:282
static void be_ut_tx_bulk_parallel_done(struct m0_be_tx_bulk *tb, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:722
#define M0_BE_QUEUE_PUT(bq, op, ptr)
Definition: queue.h:236
M0_INTERNAL int m0_be_ut_backend_init_cfg(struct m0_be_ut_backend *ut_be, const struct m0_be_domain_cfg *cfg, bool mkfs)
Definition: stubs.c:231
#define BETXCR_F
Definition: tx_credit.h:102
M0_INTERNAL void m0_be_tx_bulk_end(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:619
#define NULL
Definition: misc.h:38
static void be_ut_tx_bulk_state_done(struct m0_be_tx_bulk *tb, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:385
void m0_be_ut_tx_bulk_small_tx(void)
Definition: tx_bulk.c:553
struct be_ut_tx_bulk_parallel_ctx * bubw_ctx
Definition: tx_bulk.c:654
#define ergo(a, b)
Definition: misc.h:293
static void be_ut_tx_bulk_test_fini(struct be_ut_tx_bulk_be_ctx *be_ctx)
Definition: tx_bulk.c:117
void * b_addr
Definition: buf.h:39
void m0_be_ut_tx_bulk_medium_cred(void)
Definition: tx_bulk.c:602
#define M0_BE_QUEUE_GET(bq, op, ptr, successful)
Definition: queue.h:238
#define M0_LOG(level,...)
Definition: trace.h:167
#define min_check(a, b)
Definition: arith.h:88
size_t bbs_tx_group_nr
Definition: tx_bulk.c:279
#define M0_CASSERT(cond)
void m0_be_ut_tx_bulk_large_tx(void)
Definition: tx_bulk.c:508
struct m0_be_seg * bus_seg
Definition: helper.h:119
void m0_be_ut_tx_bulk_medium_tx(void)
Definition: tx_bulk.c:568
#define M0_BE_OP_SYNC(op_obj, action)
Definition: op.h:190
m0_bcount_t bbs_payload_use
Definition: tx_bulk.c:288
int const char const void * value
Definition: dir.c:325
void m0_be_ut_tx_bulk_large_all(void)
Definition: tx_bulk.c:538
size_t tbbc_tx_group_nr
Definition: tx_bulk.c:63
#define M0_BE_TX_CAPTURE_PTR(seg, tx, ptr)
Definition: tx.h:505
struct m0_be_ut_seg ut_seg
Definition: ad.c:73
void m0_be_ut_seg_init(struct m0_be_ut_seg *ut_seg, struct m0_be_ut_backend *ut_be, m0_bcount_t size)
Definition: stubs.c:256
uint64_t m0_bcount_t
Definition: types.h:77
static int left
Definition: locality.c:280
m0_bcount_t bbs_payload_cred
Definition: tx_bulk.c:284
#define M0_BE_TX_CREDIT_TYPE(type)
Definition: tx_credit.h:97
static void be_ut_tx_bulk_test_init(struct be_ut_tx_bulk_be_ctx **be_ctx_out, struct be_ut_tx_bulk_be_cfg *be_cfg, void(*test_prepare)(struct m0_be_ut_backend *ut_be, struct m0_be_ut_seg *ut_seg, void *ptr), void *ptr)
Definition: tx_bulk.c:67
struct m0_be_tx_credit bec_tx_size_max
Definition: engine.h:73
Definition: sock.c:887
unsigned bbs_payload_use_bp
Definition: tx_bulk.c:289
static void be_ut_tx_bulk_usecase_test_prepare(struct m0_be_ut_backend *ut_be, struct m0_be_ut_seg *ut_seg, void *ptr)
Definition: tx_bulk.c:218
op
Definition: libdemo.c:64
#define equi(a, b)
Definition: misc.h:297
#define M0_BE_TX_CREDIT(nr, size)
Definition: tx_credit.h:94
Definition: buf.h:37
struct m0_atomic64 * bbs_callback_counter
Definition: tx_bulk.c:297
static char * addr
Definition: node_k.c:37
M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq, struct m0_be_queue_cfg *cfg)
Definition: queue.c:94
int i
Definition: dir.c:1033
void m0_be_ut_tx_bulk_medium_tx_multi(void)
Definition: tx_bulk.c:584
static struct nlx_ping_client_params * params
void m0_be_ut_backend_cfg_default(struct m0_be_domain_cfg *cfg)
Definition: stubs.c:227
struct m0_be_ut_backend ut_be
Definition: ad.c:72
void m0_be_ut_tx_bulk_error_reg(void)
Definition: tx_bulk.c:478
static void be_ut_tx_bulk_usecase_do(struct m0_be_tx_bulk *tb, struct m0_be_tx *tx, struct m0_be_op *op, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:192
struct m0_be_ut_seg * tbbx_ut_seg
Definition: tx_bulk.c:59
M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
Definition: stubs.c:325
static void be_ut_tx_bulk_state_calc(struct be_ut_tx_bulk_state *tbs, bool calc_cred, struct m0_be_tx_credit *cred, m0_bcount_t *cred_payload)
Definition: tx_bulk.c:300
M0_INTERNAL int m0_be_tx_bulk_status(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:630
M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
Definition: queue.c:148
#define M0_BE_TX_CAPTURE_BUF(seg, tx, buf)
Definition: tx.h:509
m0_bcount_t b_nob
Definition: buf.h:38
struct m0_be_tx_credit tgc_size_max
Definition: tx_group.h:61
struct m0_be_queue * bubp_bq
Definition: tx_bulk.c:650
struct m0_be_tx_credit bbs_cred_max
Definition: tx_bulk.c:291
void m0_be_ut_tx_bulk_large_payload(void)
Definition: tx_bulk.c:523
struct m0_be_seg * bbs_seg
Definition: tx_bulk.c:293
static int counter
Definition: mutex.c:32
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:44
M0_INTERNAL void m0_ut_threads_stop(struct m0_ut_threads_descr *descr)
Definition: threads.c:52
uint64_t bbs_nr_max
Definition: tx_bulk.c:281
unsigned long tgc_tx_nr_max
Definition: tx_group.h:57
static void be_ut_tx_bulk_state_work_put(struct m0_be_tx_bulk *tb, bool success, void *ptr)
Definition: tx_bulk.c:324
unsigned bbs_use_bp
Definition: tx_bulk.c:287
M0_INTERNAL void m0_be_ut_alloc(struct m0_be_ut_backend *ut_be, struct m0_be_ut_seg *ut_seg, void **ptr, m0_bcount_t size)
Definition: stubs.c:300
Definition: xcode.h:73
void m0_be_ut_tx_bulk_parallel_1_15(void)
Definition: tx_bulk.c:788
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
Definition: stubs.c:104
struct m0_be_ut_backend * tbbx_ut_be
Definition: tx_bulk.c:58
#define BETXCR_P(c)
Definition: tx_credit.h:113
struct m0_atomic64 bubw_do_calls
Definition: tx_bulk.c:660
m0_bcount_t bbs_payload_max
Definition: tx_bulk.c:292
static void be_ut_tx_bulk_parallel_do(struct m0_be_tx_bulk *tb, struct m0_be_tx *tx, struct m0_be_op *op, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:703
Definition: seg.h:66
static struct fdmi_ctx ctx
Definition: main.c:80
struct m0_be_domain but_dom
Definition: helper.h:47
m0_bcount_t bbs_buf_size
Definition: tx_bulk.c:295
void(* utd_thread_func)(void *param)
Definition: threads.h:41
static void be_ut_tx_bulk_parallel_test_prepare(struct m0_be_ut_backend *ut_be, struct m0_be_ut_seg *ut_seg, void *ptr)
Definition: tx_bulk.c:664
struct m0_be_tx_credit bbs_use
Definition: tx_bulk.c:286
void m0_be_ut_tx_bulk_error_payload(void)
Definition: tx_bulk.c:493
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
#define m0_forall(var, nr,...)
Definition: misc.h:112
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
Definition: op.c:152
struct m0_atomic64 bubc_counter
Definition: tx_bulk.c:646
struct m0_be_pd_cfg bc_pd_cfg
Definition: domain.h:129
uint32_t bbs_buf_nr
Definition: tx_bulk.c:294
M0_INTERNAL bool m0_be_tx_bulk_put(struct m0_be_tx_bulk *tb, struct m0_be_op *op, struct m0_be_tx_credit *credit, m0_bcount_t payload_credit, uint64_t partition, void *user)
Definition: tx_bulk.c:583
struct be_ut_tx_bulk_be_ctx * bubc_be_ctx
Definition: tx_bulk.c:644
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
Definition: stubs.c:100
M0_INTERNAL void m0_be_tx_bulk_fini(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:288
static void be_ut_tx_bulk_parallel_thread(void *_param)
Definition: tx_bulk.c:733
static void be_ut_tx_bulk_usecase_work_put(struct m0_be_tx_bulk *tb, bool success, void *ptr)
Definition: tx_bulk.c:175
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
size_t bec_group_nr
Definition: engine.h:61
M0_INTERNAL void m0_ut_threads_start(struct m0_ut_threads_descr *descr, int thread_nr, void *param_array, size_t param_size)
Definition: threads.c:28
static void be_ut_tx_bulk_state_test_run(struct be_ut_tx_bulk_state *tbs, struct be_ut_tx_bulk_be_cfg *be_cfg, bool success)
Definition: tx_bulk.c:424
M0_INTERNAL void m0_be_domain_tx_size_max(struct m0_be_domain *dom, struct m0_be_tx_credit *cred, m0_bcount_t *payload_size)
Definition: domain.c:961
Definition: common.h:34
void m0_be_ut_tx_bulk_empty(void)
Definition: tx_bulk.c:463
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
Definition: stubs.c:92
static int start(struct m0_fom *fom)
Definition: trigger_fom.c:321
struct m0_atomic64 bubw_done_calls
Definition: tx_bulk.c:661
void m0_be_ut_backend_fini(struct m0_be_ut_backend *ut_be)
Definition: stubs.c:242
M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
Definition: stubs.c:321
struct m0_buf t_payload
Definition: tx.h:364
struct m0_be_tx_group_cfg bec_group_cfg
Definition: engine.h:71
void m0_be_ut_tx_bulk_large_cred(void)
Definition: tx_bulk.c:617
Definition: nucleus.c:42
M0_INTERNAL void m0_be_tx_credit_mul_bp(struct m0_be_tx_credit *c, unsigned bp)
Definition: tx_credit.c:82
uint32_t bpdc_seg_io_nr
Definition: pd.h:59
unsigned bbs_cred_bp
Definition: tx_bulk.c:283
Definition: op.h:74
M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
Definition: queue.c:377
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
Definition: stubs.c:87
void m0_free(void *data)
Definition: memory.c:146
static void be_ut_tx_bulk_parallel_work_put(struct m0_be_tx_bulk *tb, bool success, void *ptr)
Definition: tx_bulk.c:676
static void be_ut_tx_bulk_usecase_done(struct m0_be_tx_bulk *tb, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.c:209
struct m0_be_engine_cfg bc_engine
Definition: domain.h:79
static void be_ut_tx_bulk_test_run(struct be_ut_tx_bulk_be_ctx *be_ctx, struct m0_be_tx_bulk_cfg *tb_cfg, void(*test_work_put)(struct m0_be_tx_bulk *tb, bool success, void *ptr), void *ptr, bool success)
Definition: tx_bulk.c:126
int32_t rc
Definition: trigger_fop.h:47
#define M0_UT_ASSERT(a)
Definition: ut.h:46
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
static uint64_t max64u(uint64_t a, uint64_t b)
Definition: arith.h:71
m0_bcount_t tc_reg_size
Definition: tx_credit.h:86
M0_INTERNAL void m0_be_op_wait(struct m0_be_op *op)
Definition: stubs.c:96
size_t tbbc_tx_nr_max
Definition: tx_bulk.c:64
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
Definition: tx.h:280
unsigned bbs_payload_cred_bp
Definition: tx_bulk.c:285