Motr  M0
tx_bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
66 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
67 #include "lib/trace.h"
68 
69 #include "be/tx_bulk.h"
70 
71 #include "lib/memory.h" /* M0_ALLOC_ARR */
72 #include "lib/locality.h" /* m0_locality_get */
73 #include "lib/chan.h" /* m0_clink */
74 #include "lib/errno.h" /* ENOENT */
75 
76 #include "be/tx.h" /* m0_be_tx */
77 #include "be/domain.h" /* m0_be_domain__group_limits */ /* XXX */
78 
79 #include "sm/sm.h" /* m0_sm_ast */
80 
81 
82 enum {
89 };
90 
92  void *bbd_user;
95 };
96 
97 #define BTBI_F "(qdata=%p bbd_user=%p bbd_credit="BETXCR_F" " \
98  "bbd_payload_size=%" PRIu64 ")"
99 
100 #define BTBI_P(btbi) (btbi), (btbi)->bbd_user, BETXCR_P(&(btbi)->bbd_credit), \
101  (btbi)->bbd_payload_size
102 
104  uint64_t tbw_index;
105  uint64_t tbw_partition;
106  uint64_t tbw_locality;
107  struct m0_be_tx tbw_tx;
110  uint64_t tbw_items_nr;
116  void *tbw_user;
119  int tbw_rc;
120  struct m0_be_op tbw_op;
122  bool tbw_done;
124 };
125 
126 static void be_tx_bulk_finish_cb(struct m0_sm_group *grp,
127  struct m0_sm_ast *ast);
128 static void be_tx_bulk_queue_get_cb(struct m0_sm_group *grp,
129  struct m0_sm_ast *ast);
130 static void be_tx_bulk_init_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast);
131 static bool be_tx_bulk_open_cb(struct m0_clink *clink);
132 static void be_tx_bulk_close_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast);
133 static void be_tx_bulk_gc_cb(struct m0_be_tx *tx, void *param);
134 static void be_tx_bulk_queue_get_done_cb(struct m0_be_op *op, void *param);
135 
136 M0_INTERNAL int m0_be_tx_bulk_init(struct m0_be_tx_bulk *tb,
137  struct m0_be_tx_bulk_cfg *tb_cfg)
138 {
139  struct be_tx_bulk_worker *worker;
140  uint64_t worker_partition;
141  uint64_t workers_per_partition;
142  uint64_t partitions_per_locality;
143  uint64_t localities_nr;
144  uint64_t i;
145  uint64_t j;
146  uint64_t k;
147  int rc;
148  M0_ENTRY("tb = %p", tb);
149 
150  M0_PRE(M0_IS0(tb));
151  M0_PRE(tb_cfg->tbc_partitions_nr > 0);
152  /*
153  * Can't have more partitions than workers because each worker handles
154  * only one partition.
155  */
156  M0_PRE(tb_cfg->tbc_partitions_nr <= tb_cfg->tbc_workers_nr);
157  M0_PRE(tb_cfg->tbc_work_items_per_tx_max > 0);
158  M0_PRE(tb_cfg->tbc_q_cfg.bqc_item_length == 0);
159  /*
160  * Consumers for the queues are workers, so this value will be set by
161  * tx_bulk for each queue separately.
162  */
163  M0_PRE(tb_cfg->tbc_q_cfg.bqc_consumers_nr_max == 0);
164 
165  tb->btb_cfg = *tb_cfg;
166  tb->btb_tx_open_failed = false;
167  tb->btb_done = false;
168  tb->btb_termination_in_progress = false;
169  m0_mutex_init(&tb->btb_lock);
171  rc = tb->btb_worker == NULL ? -ENOMEM : 0;
172  if (rc != 0) {
173  m0_mutex_fini(&tb->btb_lock);
174  return M0_ERR(rc);
175  }
177  M0_ASSERT(tb->btb_q != NULL); /* XXX handle allocation error */
178  tb->btb_cfg.tbc_q_cfg.bqc_item_length = sizeof(struct be_tx_bulk_item);
180  for (i = 0; i < tb_cfg->tbc_partitions_nr; ++i) {
181  rc = m0_be_queue_init(&tb->btb_q[i], &tb->btb_cfg.tbc_q_cfg);
182  if (rc != 0)
183  break;
184  }
185  if (rc != 0) {
186  m0_free(tb->btb_worker);
187  m0_mutex_fini(&tb->btb_lock);
188  return M0_ERR(rc);
189  }
191  /* XXX take it from elsewhere */
192  localities_nr = m0_fom_dom()->fd_localities_nr;
193  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i) {
194  worker = &tb->btb_worker[i];
195  *worker = (struct be_tx_bulk_worker){
196  .tbw_index = i,
197  .tbw_tb = tb,
198  .tbw_items_nr = 0,
199  .tbw_queue_get_successful = false,
200  .tbw_grp =
202  .tbw_rc = 0,
203  .tbw_queue_get = {
204  .sa_cb = &be_tx_bulk_queue_get_cb,
205  .sa_datum = worker,
206  },
207  .tbw_init = {
209  .sa_datum = worker,
210  },
211  .tbw_close = {
213  .sa_datum = worker,
214  },
215  .tbw_finish = {
217  .sa_datum = worker,
218  },
219  .tbw_failed = false,
220  .tbw_done = false,
221  };
222  m0_be_op_init(&worker->tbw_op);
223  m0_be_op_callback_set(&worker->tbw_op,
225  worker, M0_BOS_DONE);
226  M0_ALLOC_ARR(worker->tbw_item,
228  M0_ASSERT(worker->tbw_item != NULL); /* XXX handle error */
229  }
230  if (tb->btb_cfg.tbc_partitions_nr <= localities_nr) {
231  /*
232  * - each locality has a partition assigned;
233  * - each worker takes it's work from one of such partitions.
234  */
235  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i) {
236  worker = &tb->btb_worker[i];
237  worker->tbw_locality = i % localities_nr;
238  worker->tbw_partition = worker->tbw_locality %
240  }
241  } else {
242  /*
243  * - each partition has a single locality assigned;
244  * - each partition has several workers assigned;
245  * - workers are running in the locality, assigned to the
246  * partition the worker takes it's work from.
247  */
248  workers_per_partition = tb->btb_cfg.tbc_workers_nr /
250  partitions_per_locality = (tb->btb_cfg.tbc_partitions_nr +
251  localities_nr - 1) / localities_nr;
252  worker = &tb->btb_worker[0];
253  worker_partition = 0;
254  for (i = 0; i < localities_nr; ++i) {
255  if (i != 0 &&
256  i == tb->btb_cfg.tbc_partitions_nr % localities_nr)
257  --partitions_per_locality;
258  for (j = 0; j < partitions_per_locality; ++j) {
259  if (worker_partition ==
261  tb->btb_cfg.tbc_workers_nr %
263  ++workers_per_partition;
264  for (k = 0; k < workers_per_partition; ++k) {
265  worker->tbw_locality = i;
266  worker->tbw_partition =
267  worker_partition;
268  ++worker;
269  }
270  ++worker_partition;
271  }
272  }
273  M0_ASSERT(worker_partition == tb->btb_cfg.tbc_partitions_nr);
274  M0_ASSERT(worker ==
275  &tb->btb_worker[tb->btb_cfg.tbc_workers_nr]);
277  M0_IN(m0_reduce(j, tb->btb_cfg.tbc_workers_nr, 0,
278  + (tb->btb_worker[j].tbw_partition == i)),
279  (workers_per_partition, workers_per_partition - 1))));
280  }
281  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i) {
282  worker = &tb->btb_worker[i];
283  worker->tbw_grp = m0_locality_get(worker->tbw_locality)->lo_grp;
284  }
285  return M0_RC(rc);
286 }
287 
288 M0_INTERNAL void m0_be_tx_bulk_fini(struct m0_be_tx_bulk *tb)
289 {
290  struct be_tx_bulk_worker *worker;
291  uint32_t i;
292 
293  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i) {
294  worker = &tb->btb_worker[i];
295  m0_free(worker->tbw_item);
296  m0_be_op_fini(&worker->tbw_op);
297  }
299  for (i = 0; i < tb->btb_cfg.tbc_partitions_nr; ++i)
300  m0_be_queue_fini(&tb->btb_q[i]);
301  m0_free(tb->btb_q);
302  m0_mutex_fini(&tb->btb_lock);
303  m0_free(tb->btb_worker);
304 }
305 
306 static void be_tx_bulk_lock(struct m0_be_tx_bulk *tb)
307 {
308  m0_mutex_lock(&tb->btb_lock);
309 }
310 
311 static void be_tx_bulk_unlock(struct m0_be_tx_bulk *tb)
312 {
314 }
315 
316 
317 static void be_tx_bulk_queues_drain(struct m0_be_tx_bulk *tb)
318 {
319  struct be_tx_bulk_item data;
320  uint64_t i;
321  bool successful;
322 
323  for (i = 0; i < tb->btb_cfg.tbc_partitions_nr; ++i) {
324  m0_be_queue_lock(&tb->btb_q[i]);
325  while (M0_BE_QUEUE_PEEK(&tb->btb_q[i], &data)) {
327  M0_BE_QUEUE_GET(&tb->btb_q[i], &op, &data,
328  &successful));
329  M0_ASSERT(successful);
330  }
331  m0_be_queue_unlock(&tb->btb_q[i]);
332  }
333 }
334 
335 static void be_tx_bulk_finish_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
336 {
337  struct be_tx_bulk_worker *worker = ast->sa_datum;
338  struct m0_be_tx_bulk *tb = worker->tbw_tb;
339  uint32_t done_nr = 0;
340  uint32_t i;
341  bool done;
342 
343  M0_ENTRY("tb=%p worker=%p tbw_index=%" PRIu64 " tbw_partition=%" PRIu64 " "
344  "tbw_locality=%"PRIu64, tb, worker, worker->tbw_index,
345  worker->tbw_partition, worker->tbw_locality);
346  M0_PRE(ast == &worker->tbw_finish);
347 
348  be_tx_bulk_lock(tb);
349  M0_ASSERT(!worker->tbw_done);
350  worker->tbw_done = true;
351  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i)
352  done_nr += tb->btb_worker[i].tbw_done;
353  M0_LOG(M0_DEBUG, "done_nr=%"PRIu32, done_nr);
354  done = done_nr == tb->btb_cfg.tbc_workers_nr;
355  if (done) {
356  tb->btb_rc = 0;
357  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i)
358  tb->btb_rc = tb->btb_worker[i].tbw_rc ?: tb->btb_rc;
359  tb->btb_done = true;
360  M0_LOG(M0_DEBUG, "setting tb=%p btb_done = true", tb);
361  }
362  be_tx_bulk_unlock(tb);
363  if (done) {
365  m0_be_op_done(tb->btb_op);
366  }
367  M0_LEAVE();
368 }
369 
371  struct m0_sm_ast *ast)
372 {
373  struct be_tx_bulk_worker *worker = ast->sa_datum;
374  struct m0_be_tx_bulk *tb = worker->tbw_tb;
375  struct m0_be_queue *bq = &tb->btb_q[worker->tbw_partition];
376 
377  M0_ENTRY("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
378  M0_PRE(ast == &worker->tbw_queue_get);
379  M0_PRE(worker->tbw_items_nr == 0);
380  M0_PRE(ergo(worker->tbw_rc != 0, tb->btb_tx_open_failed));
381 
382  if (worker->tbw_rc != 0) {
383  /* @see be_tx_bulk_open_cb() */
384  m0_be_tx_fini(&worker->tbw_tx);
385  }
386  if (tb->btb_tx_open_failed) {
387  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_finish);
388  } else {
389  m0_be_op_reset(&worker->tbw_op);
390  m0_be_queue_lock(bq);
391  M0_BE_QUEUE_GET(bq, &worker->tbw_op, &worker->tbw_item[0],
392  &worker->tbw_queue_get_successful);
393  m0_be_queue_unlock(bq);
394  }
395  M0_LEAVE("worker=%p tbw_index=%" PRIu64 " tbw_rc=%d",
396  worker, worker->tbw_index, worker->tbw_rc);
397 }
398 
399 static void be_tx_bulk_queue_get_done_cb(struct m0_be_op *op, void *param)
400 {
401  struct be_tx_bulk_worker *worker = param;
402 
403  M0_ENTRY("worker=%p tbw_index=%" PRIu64 " tbw_queue_get_successful=%d",
404  worker, worker->tbw_index, !!worker->tbw_queue_get_successful);
405 
406  if (worker->tbw_queue_get_successful) {
407  worker->tbw_items_nr = 1;
408  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_init);
409  } else {
410  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_finish);
411  }
412 }
413 
414 static void be_tx_bulk_open(struct be_tx_bulk_worker *worker,
415  struct m0_be_tx_credit *cred,
416  m0_bcount_t cred_payload)
417 {
418  struct m0_be_tx_bulk *tb = worker->tbw_tb;
419  struct m0_be_tx_bulk_cfg *tb_cfg = &tb->btb_cfg;
420  struct m0_be_tx *tx = &worker->tbw_tx;
421 
422  M0_SET0(tx);
423  m0_be_tx_init(tx, 0, tb_cfg->tbc_dom, worker->tbw_grp,
424  NULL, NULL, NULL, NULL);
425  m0_be_tx_gc_enable(tx, &be_tx_bulk_gc_cb, worker);
426 
427  M0_SET0(&worker->tbw_clink);
429  m0_clink_add(&tx->t_sm.sm_chan, &worker->tbw_clink);
430 
431  m0_be_tx_prep(tx, cred);
432  m0_be_tx_payload_prep(tx, cred_payload);
433  m0_be_tx_open(tx);
434 }
435 
436 static void be_tx_bulk_init_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
437 {
438  struct be_tx_bulk_worker *worker = ast->sa_datum;
439  struct m0_be_tx_credit accum_credit;
440  struct be_tx_bulk_item *data;
441  struct m0_be_tx_bulk *tb = worker->tbw_tb;
442  struct m0_be_queue *bq = &tb->btb_q[worker->tbw_partition];
443  m0_bcount_t accum_payload_size = 0;
444  bool successful;
445 
446  M0_PRE(ast == &worker->tbw_init);
447 
448  M0_ENTRY("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
449  M0_PRE(worker->tbw_items_nr == 1);
450 
451  accum_credit = worker->tbw_item[0].bbd_credit;
452  accum_payload_size = worker->tbw_item[0].bbd_payload_size;
453  /* optimisation: don't take the lock when per tx limit is only 1 item */
454  if (tb->btb_cfg.tbc_work_items_per_tx_max > 1) {
455  m0_be_queue_lock(bq);
456  while (worker->tbw_items_nr <
458  data = &worker->tbw_item[worker->tbw_items_nr];
459  if (!M0_BE_QUEUE_PEEK(bq, data))
460  break;
461  /* XXX check payload also */
463  &accum_credit,
464  &data->bbd_credit))
465  break;
467  &successful));
468  M0_ASSERT(successful);
469  m0_be_tx_credit_add(&accum_credit, &data->bbd_credit);
470  accum_payload_size += data->bbd_payload_size;
471  ++worker->tbw_items_nr;
472  }
473  m0_be_queue_unlock(bq);
474  }
475  be_tx_bulk_open(worker, &accum_credit, accum_payload_size);
476  M0_LEAVE("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
477 }
478 
479 static bool be_tx_bulk_open_cb(struct m0_clink *clink)
480 {
481  struct be_tx_bulk_worker *worker;
482  struct m0_be_tx_bulk *tb;
483  struct m0_be_tx *tx;
484 
485  worker = container_of(clink, struct be_tx_bulk_worker, tbw_clink);
486  M0_ENTRY("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
487  tx = &worker->tbw_tx;
488  tb = worker->tbw_tb;
489  if (M0_IN(m0_be_tx_state(tx), (M0_BTS_ACTIVE, M0_BTS_FAILED))) {
490  m0_clink_del(&worker->tbw_clink);
491  m0_clink_fini(&worker->tbw_clink);
492 
493  if (m0_be_tx_state(tx) == M0_BTS_ACTIVE) {
494  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_close);
495  } else {
496  be_tx_bulk_lock(tb);
497  tb->btb_tx_open_failed = true;
498  be_tx_bulk_unlock(tb);
499  worker->tbw_rc = tx->t_sm.sm_rc;
500  M0_LOG(M0_ERROR, "tx=%p rc=%d", tx, worker->tbw_rc);
502  /* the operation is not going to be executed */
503  worker->tbw_items_nr = 0;
504  /*
505  * Can't call m0_be_tx_fini(tx) here because
506  * m0_be_tx_put() for M0_BTS_FAILED transaction
507  * is called after worker transition.
508  *
509  * be_tx_bulk_queue_get_cb() will do this.
510  */
511  be_tx_bulk_gc_cb(tx, worker);
512  }
513  }
514  M0_LEAVE("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
515  return false;
516 }
517 
518 static void be_tx_bulk_close_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
519 {
520  struct m0_be_tx_bulk_cfg *tb_cfg;
521  struct be_tx_bulk_worker *worker = ast->sa_datum;
522  struct m0_be_tx_bulk *tb;
523  uint64_t i;
524 
525  M0_ENTRY("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
526  M0_PRE(ast == &worker->tbw_close);
527  tb = worker->tbw_tb;
528  tb_cfg = &tb->btb_cfg;
529  for (i = 0; i < worker->tbw_items_nr; ++i) {
530  M0_LOG(M0_DEBUG, "worker=%p tbw_index=%" PRIu64 " bbd_user=%p",
531  worker, worker->tbw_index, worker->tbw_item[i].bbd_user);
532  M0_BE_OP_SYNC(op, tb_cfg->tbc_do(tb, &worker->tbw_tx, &op,
533  tb_cfg->tbc_datum,
534  worker->tbw_item[i].bbd_user,
535  worker->tbw_index,
536  worker->tbw_partition));
537  }
538  m0_be_tx_close(&worker->tbw_tx);
539  M0_LEAVE("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
540 }
541 
542 static void be_tx_bulk_gc_cb(struct m0_be_tx *tx, void *param)
543 {
544  struct be_tx_bulk_worker *worker = param;
545  struct m0_be_tx_bulk *tb;
546  uint64_t i;
547 
548  M0_ENTRY("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
549  M0_PRE(tx == &worker->tbw_tx);
550 
551  tb = worker->tbw_tb;
552  for (i = 0; i < worker->tbw_items_nr; ++i) {
553  M0_LOG(M0_DEBUG, "worker=%p tbw_index=%" PRIu64 " bbd_user=%p",
554  worker, worker->tbw_index, worker->tbw_item[i].bbd_user);
555  tb->btb_cfg.tbc_done(tb, tb->btb_cfg.tbc_datum,
556  worker->tbw_item[i].bbd_user,
557  worker->tbw_index, worker->tbw_partition);
558  }
559  worker->tbw_items_nr = 0;
560  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_queue_get);
561 
562  M0_LEAVE("worker=%p tbw_index=%"PRIu64, worker, worker->tbw_index);
563 }
564 
565 M0_INTERNAL void m0_be_tx_bulk_run(struct m0_be_tx_bulk *tb,
566  struct m0_be_op *op)
567 {
568  struct be_tx_bulk_worker *worker;
569  uint32_t i;
570 
571  M0_ENTRY();
572  tb->btb_op = op;
573  m0_be_op_active(tb->btb_op);
574  for (i = 0; i < tb->btb_cfg.tbc_workers_nr; ++i) {
575  worker = &tb->btb_worker[i];
576  M0_LOG(M0_DEBUG, "worker=%p tbw_index=%"PRIu64,
577  worker, worker->tbw_index);
578  m0_sm_ast_post(worker->tbw_grp, &worker->tbw_queue_get);
579  }
580  M0_LEAVE();
581 }
582 
583 M0_INTERNAL bool m0_be_tx_bulk_put(struct m0_be_tx_bulk *tb,
584  struct m0_be_op *op,
585  struct m0_be_tx_credit *credit,
586  m0_bcount_t payload_credit,
587  uint64_t partition,
588  void *user)
589 {
590  struct be_tx_bulk_item data = {
591  .bbd_user = user,
592  .bbd_credit = *credit,
593  .bbd_payload_size = payload_credit,
594  };
595  bool put_fail;
596  M0_ENTRY("tb=%p", tb);
597 
598  M0_PRE(partition < tb->btb_cfg.tbc_partitions_nr);
599 
600  be_tx_bulk_lock(tb);
602 
603  if (put_fail) {
604  be_tx_bulk_unlock(tb);
606  m0_be_op_done(op);
607  return M0_RC(false);
608  }
609 
610  m0_be_queue_lock(&tb->btb_q[partition]);
611  M0_BE_QUEUE_PUT(&tb->btb_q[partition], op, &data);
612  m0_be_queue_unlock(&tb->btb_q[partition]);
613 
614  be_tx_bulk_unlock(tb);
615 
616  return M0_RC(true);
617 }
618 
619 M0_INTERNAL void m0_be_tx_bulk_end(struct m0_be_tx_bulk *tb)
620 {
621  uint64_t i;
622 
623  for (i = 0; i < tb->btb_cfg.tbc_partitions_nr; ++i) {
624  m0_be_queue_lock(&tb->btb_q[i]);
625  m0_be_queue_end(&tb->btb_q[i]);
626  m0_be_queue_unlock(&tb->btb_q[i]);
627  }
628 }
629 
630 M0_INTERNAL int m0_be_tx_bulk_status(struct m0_be_tx_bulk *tb)
631 {
632  int rc;
633 
634  be_tx_bulk_lock(tb);
635  M0_PRE(tb->btb_done);
636  rc = tb->btb_rc;
637  be_tx_bulk_unlock(tb);
638  return rc;
639 }
640 
641 #undef M0_TRACE_SUBSYSTEM
642 
645 /*
646  * Local variables:
647  * c-indentation-style: "K&R"
648  * c-basic-offset: 8
649  * tab-width: 8
650  * fill-column: 80
651  * scroll-step: 1
652  * End:
653  */
654 /*
655  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
656  */
uint64_t bqc_consumers_nr_max
Definition: queue.h:127
static void be_tx_bulk_open(struct be_tx_bulk_worker *worker, struct m0_be_tx_credit *cred, m0_bcount_t cred_payload)
Definition: tx_bulk.c:414
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
bool btb_tx_open_failed
Definition: tx_bulk.h:99
M0_INTERNAL void m0_be_tx_bulk_run(struct m0_be_tx_bulk *tb, struct m0_be_op *op)
Definition: tx_bulk.c:565
M0_INTERNAL int m0_be_tx_bulk_init(struct m0_be_tx_bulk *tb, struct m0_be_tx_bulk_cfg *tb_cfg)
Definition: tx_bulk.c:136
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
#define M0_BE_QUEUE_PUT(bq, op, ptr)
Definition: queue.h:236
M0_INTERNAL struct m0_locality * m0_locality_get(uint64_t value)
Definition: locality.c:156
M0_INTERNAL void m0_be_tx_bulk_end(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:619
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
struct m0_sm_ast tbw_queue_get
Definition: tx_bulk.c:112
#define ergo(a, b)
Definition: misc.h:293
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
m0_be_tx_state
Definition: tx.h:214
#define M0_BE_QUEUE_GET(bq, op, ptr, successful)
Definition: queue.h:238
static struct m0_sm_group * grp
Definition: bytecount.c:38
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
struct m0_sm_ast tbw_init
Definition: tx_bulk.c:113
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_be_queue * btb_q
Definition: tx_bulk.h:91
M0_INTERNAL void m0_be_tx_fini(struct m0_be_tx *tx)
Definition: stubs.c:163
#define M0_BE_OP_SYNC(op_obj, action)
Definition: op.h:190
m0_bcount_t bbd_payload_size
Definition: tx_bulk.c:94
struct m0_be_op tbw_op
Definition: tx_bulk.c:120
struct m0_bufvec data
Definition: di.c:40
struct m0_be_domain * tbc_dom
Definition: tx_bulk.h:70
M0_INTERNAL void m0_be_tx_prep(struct m0_be_tx *tx, const struct m0_be_tx_credit *credit)
Definition: stubs.c:175
uint64_t tbw_locality
Definition: tx_bulk.c:106
uint64_t m0_bcount_t
Definition: types.h:77
Definition: sm.h:504
M0_INTERNAL void m0_be_op_callback_set(struct m0_be_op *op, m0_be_op_cb_t cb, void *param, enum m0_be_op_state state)
Definition: op.c:239
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static bool be_tx_bulk_open_cb(struct m0_clink *clink)
Definition: tx_bulk.c:479
int btb_rc
Definition: tx_bulk.h:95
struct m0_sm_group * tbw_grp
Definition: tx_bulk.c:118
static void be_tx_bulk_queues_drain(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:317
return M0_RC(rc)
op
Definition: libdemo.c:64
uint64_t tbc_partitions_nr
Definition: tx_bulk.h:67
struct m0_mutex btb_lock
Definition: tx_bulk.h:97
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
bool btb_done
Definition: tx_bulk.h:100
struct m0_sm_ast tbw_close
Definition: tx_bulk.c:114
M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq, struct m0_be_queue_cfg *cfg)
Definition: queue.c:94
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
struct m0_be_tx_credit bbd_credit
Definition: tx_bulk.c:93
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
uint64_t tbc_workers_nr
Definition: tx_bulk.h:66
M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
Definition: stubs.c:325
M0_INTERNAL int m0_be_tx_bulk_status(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:630
struct be_tx_bulk_item * tbw_item
Definition: tx_bulk.c:109
M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
Definition: queue.c:148
void * tbw_user
Definition: tx_bulk.c:116
void * bbd_user
Definition: tx_bulk.c:92
#define M0_ASSERT(cond)
struct m0_be_tx_bulk * tbw_tb
Definition: tx_bulk.c:108
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:44
static void be_tx_bulk_close_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: tx_bulk.c:518
bool tbw_queue_get_successful
Definition: tx_bulk.c:111
M0_INTERNAL void m0_be_tx_close(struct m0_be_tx *tx)
Definition: stubs.c:194
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
struct m0_sm_group * lo_grp
Definition: locality.h:67
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
Definition: stubs.c:104
int32_t sm_rc
Definition: sm.h:336
static void put_fail(void)
Definition: client_ut.c:2942
static struct m0_clink clink[RDWR_REQUEST_MAX]
uint64_t tbw_partition
Definition: tx_bulk.c:105
struct m0_sm_ast tbw_finish
Definition: tx_bulk.c:115
struct m0_sm t_sm
Definition: tx.h:281
void * tbc_datum
Definition: tx_bulk.h:72
uint64_t tbw_index
Definition: tx_bulk.c:104
uint64_t tbc_work_items_per_tx_max
Definition: tx_bulk.h:68
static void be_tx_bulk_unlock(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:311
static void be_tx_bulk_lock(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:306
M0_INTERNAL void m0_be_tx_init(struct m0_be_tx *tx, uint64_t tid, struct m0_be_domain *dom, struct m0_sm_group *sm_group, m0_be_tx_cb_t persistent, m0_be_tx_cb_t discarded, void(*filler)(struct m0_be_tx *tx, void *payload), void *datum)
Definition: stubs.c:150
#define m0_forall(var, nr,...)
Definition: misc.h:112
static void be_tx_bulk_finish_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: tx_bulk.c:335
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
Definition: op.c:152
#define PRIu32
Definition: types.h:66
M0_INTERNAL bool m0_be_tx_bulk_put(struct m0_be_tx_bulk *tb, struct m0_be_op *op, struct m0_be_tx_credit *credit, m0_bcount_t payload_credit, uint64_t partition, void *user)
Definition: tx_bulk.c:583
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
Definition: stubs.c:100
M0_INTERNAL void m0_be_tx_bulk_fini(struct m0_be_tx_bulk *tb)
Definition: tx_bulk.c:288
static void be_tx_bulk_gc_cb(struct m0_be_tx *tx, void *param)
Definition: tx_bulk.c:542
m0_bcount_t bqc_item_length
Definition: queue.h:128
#define M0_IS0(obj)
Definition: misc.h:70
struct m0_clink tbw_clink
Definition: tx_bulk.c:117
struct m0_chan sm_chan
Definition: sm.h:331
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_INTERNAL void m0_be_tx_open(struct m0_be_tx *tx)
Definition: stubs.c:184
M0_INTERNAL struct m0_fom_domain * m0_fom_dom(void)
Definition: locality.c:575
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
Definition: stubs.c:92
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
#define M0_BE_QUEUE_PEEK(bq, ptr)
Definition: queue.h:240
M0_INTERNAL bool m0_be_should_break(struct m0_be_engine *eng, const struct m0_be_tx_credit *accum, const struct m0_be_tx_credit *delta)
Definition: tx.c:714
M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
Definition: stubs.c:321
struct m0_be_op btb_kill_put_op
Definition: tx_bulk.h:103
void(* tbc_done)(struct m0_be_tx_bulk *tb, void *datum, void *user, uint64_t worker_index, uint64_t partition)
Definition: tx_bulk.h:82
static void be_tx_bulk_init_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: tx_bulk.c:436
M0_INTERNAL void m0_be_tx_gc_enable(struct m0_be_tx *tx, void(*gc_free)(struct m0_be_tx *, void *param), void *param)
Definition: tx.c:728
static unsigned done
Definition: storage.c:91
struct m0_be_tx_bulk_cfg btb_cfg
Definition: tx_bulk.h:90
bool tbw_terminate_order
Definition: tx_bulk.c:123
M0_INTERNAL void m0_be_tx_payload_prep(struct m0_be_tx *tx, m0_bcount_t size)
Definition: stubs.c:180
struct m0_be_engine bd_engine
Definition: domain.h:136
Definition: op.h:59
Definition: op.h:74
M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
Definition: queue.c:377
bool btb_termination_in_progress
Definition: tx_bulk.h:101
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
Definition: stubs.c:87
void m0_free(void *data)
Definition: memory.c:146
size_t fd_localities_nr
Definition: fom.h:333
struct m0_be_queue_cfg tbc_q_cfg
Definition: tx_bulk.h:65
int32_t rc
Definition: trigger_fop.h:47
static void be_tx_bulk_queue_get_done_cb(struct m0_be_op *op, void *param)
Definition: tx_bulk.c:399
uint64_t tbw_items_nr
Definition: tx_bulk.c:110
Definition: tx.h:280
struct m0_be_tx tbw_tx
Definition: tx_bulk.c:107
struct m0_be_op * btb_op
Definition: tx_bulk.h:102
struct be_tx_bulk_worker * btb_worker
Definition: tx_bulk.h:93
static void be_tx_bulk_queue_get_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: tx_bulk.c:370