Motr  M0
node_bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/arith.h" /* M0_SWAP */
24 #include "lib/errno.h" /* EALREADY */
25 #include "lib/memory.h" /* M0_ALLOC_PTR */
26 #include "lib/misc.h" /* M0_IN */
27 #include "lib/atomic.h" /* m0_atomic64 */
28 
29 #include "motr/magic.h" /* M0_NET_TEST_BSB_MAGIC */
30 
31 #include "net/test/network.h" /* m0_net_test_network_ctx */
32 #include "net/test/node.h" /* m0_net_test_node_ctx */
33 #include "net/test/node_helper.h" /* m0_net_test_node_ctx */
34 #include "net/test/service.h" /* m0_net_test_service */
35 #include "net/test/ringbuf.h" /* m0_net_test_ringbuf */
36 
37 #include "net/test/node_bulk.h"
38 
39 #define NET_TEST_MODULE_NAME node_bulk
40 #include "net/test/debug.h" /* LOGD */
41 
90  TS_UNUSED = 0,
103 };
104 
111  int bse_cb_rc;
112 };
113 
117  uint64_t bsb_magic;
124  size_t bsb_index;
148 };
149 
153  uint64_t bsp_magic;
155  size_t bsp_index;
164 };
165 
169  uint64_t ssb_magic;
171  size_t ssb_index;
176 };
177 
187 };
188 
189 M0_TL_DESCR_DEFINE(bsb, "buf_status_bulk", static,
190  struct buf_status_bulk, bsb_tlink, bsb_magic,
192 M0_TL_DEFINE(bsb, static, struct buf_status_bulk);
193 
194 M0_TL_DESCR_DEFINE(ssb, "server_status_bulk", static,
195  struct server_status_bulk, ssb_tlink, ssb_magic,
197 M0_TL_DEFINE(ssb, static, struct server_status_bulk);
198 
199 M0_TL_DESCR_DEFINE(bsp, "buf_status_ping", static,
200  struct buf_status_ping, bsp_tlink, bsp_magic,
202 M0_TL_DEFINE(bsp, static, struct buf_status_ping);
203 
237  size_t nbc_bs_nr;
283 };
284 
286 static void sd_update(struct node_bulk_ctx *ctx,
288  enum m0_net_test_nh_msg_status status,
289  enum m0_net_test_nh_msg_direction direction)
290 {
291  m0_net_test_nh_sd_update(&ctx->nbc_nh, type, status, direction);
292 }
293 
294 static void node_bulk_tm_event_cb(const struct m0_net_tm_event *ev)
295 {
296  /* nothing for now */
297 }
298 
299 static const struct m0_net_tm_callbacks node_bulk_tm_cb = {
301 };
302 
303 static struct node_bulk_ctx *
305 {
306  return container_of(net_ctx, struct node_bulk_ctx, nbc_net);
307 }
308 
309 static bool
311  enum transfer_state to,
312  const struct state_transition allowed[],
313  size_t allowed_size)
314 {
315  size_t i;
316 
317  M0_PRE(allowed != NULL);
318  M0_PRE(allowed_size > 0);
319 
320  for (i = 0; i < allowed_size; ++i) {
321  if (allowed[i].sta_from == from && allowed[i].sta_to == to)
322  break;
323  }
324  return i < allowed_size;
325 }
326 
328 {
329  return M0_IN(state, (TS_FAILED, TS_TRANSFERRED, TS_BADMSG));
330 }
331 
333  size_t bs_index,
334  enum transfer_state state)
335 {
336  static const struct state_transition allowed_client[] = {
337  { .sta_from = TS_UNUSED, .sta_to = TS_QUEUED },
338  { .sta_from = TS_QUEUED, .sta_to = TS_BD_SENT },
339  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED },
340  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED1 },
341  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED2 },
342  { .sta_from = TS_BD_SENT, .sta_to = TS_CB_LEFT2 },
343  { .sta_from = TS_BD_SENT, .sta_to = TS_FAILED2 },
344  { .sta_from = TS_CB_LEFT2, .sta_to = TS_CB_LEFT1 },
345  { .sta_from = TS_CB_LEFT2, .sta_to = TS_FAILED1 },
346  { .sta_from = TS_CB_LEFT1, .sta_to = TS_TRANSFERRED },
347  { .sta_from = TS_CB_LEFT1, .sta_to = TS_FAILED },
348  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
349  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
350  { .sta_from = TS_TRANSFERRED, .sta_to = TS_UNUSED },
351  { .sta_from = TS_FAILED, .sta_to = TS_UNUSED },
352  };
353  static const struct state_transition allowed_server[] = {
354  { .sta_from = TS_UNUSED, .sta_to = TS_BD_RECEIVED },
355  { .sta_from = TS_BD_RECEIVED, .sta_to = TS_BADMSG },
356  { .sta_from = TS_BD_RECEIVED, .sta_to = TS_RECEIVING },
357  { .sta_from = TS_RECEIVING, .sta_to = TS_SENDING },
358  { .sta_from = TS_RECEIVING, .sta_to = TS_FAILED },
359  { .sta_from = TS_SENDING, .sta_to = TS_TRANSFERRED },
360  { .sta_from = TS_SENDING, .sta_to = TS_FAILED },
361  { .sta_from = TS_TRANSFERRED, .sta_to = TS_UNUSED },
362  { .sta_from = TS_FAILED, .sta_to = TS_UNUSED },
363  { .sta_from = TS_BADMSG, .sta_to = TS_UNUSED },
364  };
365  enum m0_net_test_role role;
366  struct buf_status_bulk *bs;
367  bool can_change;
368  bool role_client;
369 
370  M0_PRE(ctx != NULL);
371  M0_PRE(bs_index < ctx->nbc_bs_nr);
372  M0_PRE(ctx->nbc_bs != NULL);
373 
374  LOGD("state_change: role = %d, bs_index = %lu, state = %d",
375  ctx->nbc_nh.ntnh_role, bs_index, state);
376 
377  role = ctx->nbc_nh.ntnh_role;
378  bs = &ctx->nbc_bs[bs_index];
379  role_client = role == M0_NET_TEST_ROLE_CLIENT;
380  can_change = node_bulk_state_change_allowed(bs->bsb_ts, state,
381  role_client ? allowed_client : allowed_server,
382  role_client ? ARRAY_SIZE(allowed_client) :
383  ARRAY_SIZE(allowed_server));
384  M0_ASSERT(can_change);
385  bs->bsb_ts = state;
386 
387  /* add to ringbufs if needed */
388  if (state == TS_UNUSED)
389  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_unused, bs_index);
390  if (node_bulk_state_is_final(state))
391  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_final, bs_index);
392  /* set start & finish timestamp */
393  if (M0_IN(state, (TS_RECEIVING, TS_QUEUED)))
394  bs->bsb_time_start = m0_time_now();
395  if (state == TS_TRANSFERRED)
397  /* reset buf_status_errno if needed */
398  if (state == TS_UNUSED) {
399  M0_SET0(&bs->bsb_msg);
400  M0_SET0(&bs->bsb_send);
401  M0_SET0(&bs->bsb_recv);
402  }
403 }
404 
406  { .sta_from = TS_BD_SENT, .sta_to = TS_CB_LEFT2 },
407  { .sta_from = TS_CB_LEFT2, .sta_to = TS_CB_LEFT1 },
408  { .sta_from = TS_CB_LEFT1, .sta_to = TS_TRANSFERRED },
409  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
410  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
411 };
413  { .sta_from = TS_BD_SENT, .sta_to = TS_FAILED2 },
414  { .sta_from = TS_CB_LEFT2, .sta_to = TS_FAILED1 },
415  { .sta_from = TS_CB_LEFT1, .sta_to = TS_FAILED },
416  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
417  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
418 };
420  { .sta_from = TS_RECEIVING, .sta_to = TS_SENDING },
421  { .sta_from = TS_SENDING, .sta_to = TS_TRANSFERRED },
422 };
424  { .sta_from = TS_RECEIVING, .sta_to = TS_FAILED },
425  { .sta_from = TS_SENDING, .sta_to = TS_FAILED },
426 };
427 
428 static const struct {
430  const size_t nbst_nr;
432 #define TRANSITION(name) { \
433  .nbst_transition = name, \
434  .nbst_nr = ARRAY_SIZE(name), \
435 }
440 #undef TRANSITION
441 };
442 
444 static void node_bulk_state_check(const struct state_transition state_list[],
445  size_t state_nr)
446 {
447  size_t i;
448  size_t j;
449 
450  for (i = 0; i < state_nr; ++i) {
451  for (j = i + 1; j < state_nr; ++j) {
452  M0_ASSERT(state_list[i].sta_from !=
453  state_list[j].sta_from);
454  }
455  }
456 }
457 
459 static void node_bulk_state_check_all(void)
460 {
461  size_t i;
462 
463  for (i = 0; i < ARRAY_SIZE(node_bulk_state_transitions); ++i) {
467  }
468 }
469 
470 static enum transfer_state
472  const struct state_transition state_list[],
473  size_t state_nr)
474 {
475  size_t i;
476 
477  for (i = 0; i < state_nr; ++i) {
478  if (state_list[i].sta_from == state)
479  return state_list[i].sta_to;
480  }
481  M0_IMPOSSIBLE("Invalid 'from' state in net-test bulk testing.");
482  return TS_UNUSED;
483 }
484 
486  size_t bs_index,
487  bool success)
488 {
489  const struct state_transition *transition;
490  size_t transition_size;
491  enum transfer_state state;
492 
493  M0_PRE(ctx != NULL);
494  M0_PRE(bs_index < ctx->nbc_bs_nr);
495  M0_PRE(ctx->nbc_bs != NULL);
496 
497  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
498  transition = success ?
501  transition_size = success ?
504  } else if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_SERVER) {
505  transition = success ?
508  transition_size = success ?
511  } else {
512  transition = NULL;
513  transition_size = 0;
514  M0_IMPOSSIBLE("Invalid node role in net-test bulk testing");
515  }
516  state = node_bulk_state_search(ctx->nbc_bs[bs_index].bsb_ts,
517  transition, transition_size);
518  node_bulk_state_change(ctx, bs_index, state);
519 }
520 
522  size_t bs_index)
523 {
524  struct buf_status_bulk *bs;
525  bool role_client;
526  m0_time_t rtt;
527 
528  M0_PRE(ctx != NULL);
529  M0_PRE(bs_index < ctx->nbc_bs_nr);
530 
531  role_client = ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT;
532  bs = &ctx->nbc_bs[bs_index];
533  /* Check final states */
534  M0_ASSERT(M0_IN(bs->bsb_ts, (TS_TRANSFERRED, TS_FAILED)) ||
535  (!role_client && bs->bsb_ts == TS_BADMSG));
536  switch (bs->bsb_ts) {
537  case TS_TRANSFERRED:
540  m0_net_test_nh_sd_update_rtt(&ctx->nbc_nh, rtt);
541  break;
542  case TS_FAILED:
544  break;
545  case TS_BADMSG:
547  break;
548  default:
549  M0_IMPOSSIBLE("Impossible final state in "
550  "net-test bulk testing");
551  }
553 }
554 
556 {
557  size_t bs_index;
558  size_t i;
559  size_t nr;
560 
561  M0_PRE(ctx != NULL);
562 
563  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_final);
564  for (i = 0; i < nr; ++i) {
565  bs_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_final);
567  }
568  M0_POST(m0_net_test_ringbuf_is_empty(&ctx->nbc_rb_bulk_final));
569 }
570 
572 {
573  size_t index;
574  size_t i;
575  size_t nr;
576  int rc;
577 
578  M0_PRE(ctx != NULL);
579 
580  /*
581  * Below lock is added for sync between server thread and call back
582  * from node_bulk_cb().
583  */
584  m0_mutex_lock(&ctx->nbc_bulk_mutex);
585  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_ping_unused);
586  for (i = 0; i < nr; ++i) {
587  index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_ping_unused);
589  if (rc != 0) {
591  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused,
592  index);
593  }
594  }
595  m0_mutex_unlock(&ctx->nbc_bulk_mutex);
596 }
597 
599  size_t buf_bulk_index)
600 {
602  buf_bulk_index);
603 }
604 
605 static void buf_desc_set0(struct node_bulk_ctx *ctx,
606  size_t buf_bulk_index)
607 {
608  M0_PRE(ctx != NULL);
609  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
610 
611  M0_SET0(&net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc);
612  M0_SET0(&ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
613 }
614 
619 static void buf_desc_swap(struct node_bulk_ctx *ctx,
620  size_t buf_bulk_index)
621 {
622  M0_PRE(ctx != NULL);
623  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
624 
625  M0_SWAP(net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc,
626  ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
627 }
628 
630  size_t buf_bulk_index)
631 {
632  M0_PRE(ctx != NULL);
633  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
634 
635  m0_net_desc_free(&net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc);
636  m0_net_desc_free(&ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
637 }
638 
640  size_t bs_index)
641 {
642  M0_PRE(ctx != NULL);
643  M0_PRE(bs_index * 2 + 1 < ctx->nbc_buf_bulk_nr);
644 
645  m0_net_desc_free(&net_buf_bulk_get(ctx, bs_index * 2)->nb_desc);
646  m0_net_desc_free(&net_buf_bulk_get(ctx, bs_index * 2 + 1)->nb_desc);
647 }
648 
650  size_t buf_bulk_index,
651  size_t buf_ping_index,
653 {
654  m0_bcount_t len;
655  m0_bcount_t len_total;
656 
657  M0_PRE(ctx != NULL);
658  M0_PRE(buf_ping_index < ctx->nbc_buf_ping_nr);
659  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
660 
661  buf_desc_set0(ctx, buf_bulk_index);
662  /* decode network buffer descriptors for active bulk receiving */
664  &ctx->nbc_net, buf_bulk_index,
665  buf_ping_index, offset);
666  if (len == 0)
667  return 0;
668  len_total = net_test_len_accumulate(0, len);
669 
670  /*
671  * buf->nb_desc = zero descriptor
672  * bs->bsb_desc_send = descriptor for active bulk receiving
673  */
674  buf_desc_swap(ctx, buf_bulk_index);
675 
677  &ctx->nbc_net, buf_bulk_index,
678  buf_ping_index,
679  offset + len_total);
680  if (len == 0) {
681  /* free already allocated network descriptor */
682  buf_desc_server_free(ctx, buf_bulk_index);
683  return 0;
684  }
685  len_total = net_test_len_accumulate(len_total, len);
686 
687  /*
688  * buf->nb_desc = descriptor for active bulk receiving
689  * bs->bsb_desc_send = descriptor for active bulk sending
690  */
691  buf_desc_swap(ctx, buf_bulk_index);
692 
693  return len_total;
694 }
695 
703  size_t buf_ping_index,
705 {
706  m0_bcount_t len;
707  size_t buf_bulk_index;
708  bool no_unused_buf;
709  int rc;
710 
711  no_unused_buf = m0_net_test_ringbuf_is_empty(&ctx->nbc_rb_bulk_unused);
712  if (no_unused_buf) {
713  LOGD("--- NO UNUSED BUFS");
715  return 0;
716  }
717 
718  /* get unused buf */
719  buf_bulk_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_unused);
720  M0_ASSERT(buf_bulk_index < ctx->nbc_buf_bulk_nr);
721  node_bulk_state_change(ctx, buf_bulk_index, TS_BD_RECEIVED);
722  /* deserialize network buffer descriptors */
723  len = buf_desc_deserialize(ctx, buf_bulk_index, buf_ping_index, offset);
724  if (len == 0) {
725  LOGD("BADMSG: buf_bulk_index = %lu, "
726  "buf_ping_index = %lu, offset = %lu",
727  buf_bulk_index, buf_ping_index, (unsigned long) offset);
728  /* ping buffer contains invalid data */
729  node_bulk_state_change(ctx, buf_bulk_index, TS_BADMSG);
730  return 0;
731  }
732  node_bulk_state_change(ctx, buf_bulk_index, TS_RECEIVING);
733  /* start active bulk receiving */
734  rc = m0_net_test_network_bulk_enqueue(&ctx->nbc_net, buf_bulk_index, 0,
736  ctx->nbc_bs[buf_bulk_index].bsb_recv.bse_func_rc = rc;
737  if (rc != 0) {
738  /*
739  * Addition buffer to network queue failed.
740  * Free allocated (when deserialized) network descriptors.
741  */
742  node_bulk_state_change(ctx, buf_bulk_index, TS_FAILED);
743  buf_desc_server_free(ctx, buf_bulk_index);
745  }
746  return rc == 0 ? len : 0;
747 }
748 
750  size_t buf_index,
751  enum m0_net_queue_type q,
752  const struct m0_net_buffer_event *ev)
753 {
755  m0_bcount_t len;
756  size_t nr;
757  size_t i;
758  int rc;
759 
760  M0_PRE(ctx != NULL);
761  M0_PRE(ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_SERVER);
762  M0_PRE(ergo(q == M0_NET_QT_MSG_RECV, buf_index < ctx->nbc_buf_ping_nr));
765  buf_index < ctx->nbc_buf_bulk_nr));
766 
767  if (q == M0_NET_QT_MSG_RECV) {
768  if (ev->nbe_status != 0)
769  return;
770  nr = m0_net_test_network_bd_nr(&ctx->nbc_net, buf_index);
771  if (nr % 2 != 0) {
772  LOGD("MS_BAD: nr = %lu", nr);
774  return;
775  }
776  nr /= 2;
777  offset = 0;
778  for (i = 0; i < nr; ++i) {
779  len = node_bulk_server_transfer_start(ctx, buf_index,
780  offset);
781  offset += len;
782  }
783  } else if (q == M0_NET_QT_ACTIVE_BULK_RECV) {
784  if (ev->nbe_status != 0) {
785  LOGD("--- active bulk recv FAILED!");
786  buf_desc_server_free(ctx, buf_index);
787  return;
788  }
789  /*
790  * Don't free m0_net_buf_desc here to avoid
791  * memory allocator delays.
792  */
793  buf_desc_swap(ctx, buf_index);
795  buf_index, 0,
797  ctx->nbc_bs[buf_index].bsb_send.bse_func_rc = rc;
798  if (rc != 0) {
799  LOGD("--- active bulk send FAILED!");
800  buf_desc_server_free(ctx, buf_index);
801  node_bulk_state_change(ctx, buf_index, TS_FAILED);
803  }
804  } else if (q == M0_NET_QT_ACTIVE_BULK_SEND) {
805  buf_desc_server_free(ctx, buf_index);
806  }
807 }
808 
810  size_t buf_index,
811  enum m0_net_queue_type q,
812  const struct m0_net_buffer_event *ev)
813 {
814  struct buf_status_bulk *bs;
815 
816  M0_PRE(ctx != NULL);
817  M0_PRE(ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT);
818  M0_PRE(ergo(q == M0_NET_QT_MSG_SEND, buf_index < ctx->nbc_buf_ping_nr));
821  buf_index < ctx->nbc_buf_bulk_nr));
822 
823  if (q == M0_NET_QT_MSG_SEND) {
824  /*
825  * Change state for every bulk buffer, which
826  * descriptor is stored in current message.
827  */
828  m0_tl_teardown(bsb, &ctx->nbc_bsp[buf_index].bsp_buffers, bs) {
829  bs->bsb_msg.bse_cb_rc = ev->nbe_status;
831  ev->nbe_status == 0);
832  }
833  } else if (M0_IN(q, (M0_NET_QT_PASSIVE_BULK_RECV,
835  bs = &ctx->nbc_bs[buf_index / 2];
837  buf_desc_client_free(ctx, buf_index / 2);
838  }
839 }
840 
842 {
843  return m0_atomic64_get(&ctx->nbc_stop_flag) == 1;
844 }
845 
846 static void node_bulk_cb(struct m0_net_test_network_ctx *net_ctx,
847  const uint32_t buf_index,
848  enum m0_net_queue_type q,
849  const struct m0_net_buffer_event *ev)
850 {
851  struct buf_status_bulk *bs;
852  size_t bs_index;
853  struct buf_status_errno *bs_e;
854  struct node_bulk_ctx *ctx = node_bulk_ctx_from_net_ctx(net_ctx);
855  bool role_client;
856  bool buf_send;
857  bool buf_bulk;
858 
859  LOGD("node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d"
860  ", ev-nbe_status = %d",
861  net_ctx->ntc_tm->ntm_ep->nep_addr, buf_index, q, ev->nbe_status);
862  M0_PRE(net_ctx != NULL);
863  role_client = ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT;
864  M0_PRE(ergo(q == M0_NET_QT_MSG_RECV, !role_client));
865  M0_PRE(ergo(q == M0_NET_QT_MSG_SEND, role_client));
866  M0_PRE(ergo(q == M0_NET_QT_PASSIVE_BULK_RECV, role_client));
867  M0_PRE(ergo(q == M0_NET_QT_PASSIVE_BULK_SEND, role_client));
868  M0_PRE(ergo(q == M0_NET_QT_ACTIVE_BULK_RECV, !role_client));
869  M0_PRE(ergo(q == M0_NET_QT_ACTIVE_BULK_SEND, !role_client));
871  buf_index < ctx->nbc_buf_ping_nr));
872 
873  if (ev->nbe_status != 0 && ev->nbe_status != -ECANCELED) {
874  LOGD("--CALLBACK ERROR! errno = %d", ev->nbe_status);
875  LOGD("node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d"
876  ", ev-nbe_status = %d",
877  net_ctx->ntc_tm->ntm_ep->nep_addr, buf_index, q,
878  ev->nbe_status);
879  }
880 
881  ctx->nbc_callback_executed = true;
882  buf_bulk = false;
883  if (M0_IN(q,
886  buf_bulk = true;
887  bs_index = role_client ? buf_index / 2 : buf_index;
888  M0_ASSERT(bs_index < ctx->nbc_bs_nr);
889  bs = &ctx->nbc_bs[bs_index];
890  bs_e = q == M0_NET_QT_PASSIVE_BULK_RECV ||
894  M0_ASSERT(bs_e != NULL);
895  bs_e->bse_cb_rc = ev->nbe_status;
896  node_bulk_state_change_cb(ctx, bs_index, ev->nbe_status == 0);
897  }
898  (role_client ? node_bulk_cb_client : node_bulk_cb_server)
899  (ctx, buf_index, q, ev);
900  if (M0_IN(q, (M0_NET_QT_MSG_SEND, M0_NET_QT_MSG_RECV))) {
901  /* ping buffer can be reused now */
902  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused, buf_index);
903  }
904  if (!role_client && q == M0_NET_QT_MSG_RECV) {
907  }
908  /* update stats */
909  buf_send = q == M0_NET_QT_PASSIVE_BULK_SEND ||
911  sd_update(ctx, buf_bulk ? MT_BULK : MT_MSG,
912  ev->nbe_status == 0 ? MS_SUCCESS : MS_FAILED,
913  buf_send ? MD_SEND : MD_RECV);
914  /* state transitions from final states */
916 }
917 
919  .ntnbc_cb = {
926  }
927 };
928 
930 static size_t client_server_index(struct node_bulk_ctx *ctx, size_t bs_index)
931 {
932  M0_PRE(ctx != NULL);
933  M0_PRE(ctx->nbc_client_concurrency > 0);
934 
935  return bs_index / ctx->nbc_client_concurrency;
936 }
937 
940  size_t buf_index)
941 {
942  enum m0_net_queue_type q;
943  size_t server_index;
944 
945  q = buf_index % 2 == 0 ? M0_NET_QT_PASSIVE_BULK_SEND :
947  server_index = client_server_index(ctx, buf_index / 2);
948  return m0_net_test_network_bulk_enqueue(&ctx->nbc_net, buf_index,
949  server_index, q);
950 }
951 
953  size_t buf_index)
954 {
956  buf_index);
957 }
958 
960  size_t bs_index)
961 {
962  struct buf_status_bulk *bs;
963  int rc;
964 
965  M0_PRE(ctx != NULL);
966  M0_PRE(bs_index < ctx->nbc_bs_nr);
967 
968  bs = &ctx->nbc_bs[bs_index];
970  rc = client_bulk_enqueue(ctx, bs_index * 2);
971  bs->bsb_send.bse_func_rc = rc;
972  if (rc != 0) {
975  return;
976  }
977  rc = client_bulk_enqueue(ctx, bs_index * 2 + 1);
978  bs->bsb_recv.bse_func_rc = rc;
979  if (rc != 0) {
981  client_bulk_dequeue(ctx, bs_index * 2);
983  return;
984  }
985  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_queued, bs_index);
986 }
987 
989 {
990  size_t i;
991  size_t nr;
992  size_t bs_index;
993  bool transfer_next;
994 
995  M0_PRE(ctx != NULL);
996 
997  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_unused);
998  for (i = 0; i < nr; ++i) {
999  /* Check stop conditions */
1000  transfer_next = m0_net_test_nh_transfer_next(&ctx->nbc_nh);
1001  /*
1002  LOGD("client: transfer_next = %s",
1003  transfer_next ? (char *) "true" : (char *) "false");
1004  */
1005  if (!transfer_next)
1006  break;
1007  /* Start next transfer */
1008  bs_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_unused);
1009  /*
1010  LOGD("client: next transfer bs_index = %lu",
1011  bs_index);
1012  */
1013  client_transfer_start(ctx, bs_index);
1014  }
1015 }
1016 
1018  size_t bsb_index,
1019  size_t msg_buf_index,
1021 {
1022  m0_bcount_t len;
1023  m0_bcount_t len_total;
1024 
1026  net, bsb_index * 2,
1027  msg_buf_index, offset);
1028  if (len == 0)
1029  return 0;
1030  len_total = net_test_len_accumulate(0, len);
1031 
1033  net, bsb_index * 2 + 1,
1034  msg_buf_index,
1035  offset + len_total);
1036  if (len == 0) {
1037  /*
1038  * If first buffer descriptor serializing succeed,
1039  * then number of serialized network buffer descriptors
1040  * is increased. But if second serializing fails, then
1041  * number of network buffer descriptors inside ping
1042  * buffer should be decreased because these two
1043  * descriptors should be added both or should not be
1044  * added at all.
1045  */
1046  m0_net_test_network_bd_nr_dec(net, msg_buf_index);
1047  }
1048  len_total = net_test_len_accumulate(len_total, len);
1049  return len_total;
1050 }
1051 
1052 
1054  struct buf_status_bulk *bs)
1055 {
1056  client_bulk_dequeue(ctx, bs->bsb_index * 2);
1057  client_bulk_dequeue(ctx, bs->bsb_index * 2 + 1);
1058 }
1059 
1060 static void client_bds_send(struct node_bulk_ctx *ctx,
1061  struct server_status_bulk *ss)
1062 {
1063  struct m0_net_test_ringbuf *rb_ping = &ctx->nbc_rb_ping_unused;
1064  struct buf_status_bulk *bs;
1065  /* Message buffer was taken from unused list */
1066  bool msg_taken;
1067  /* Message buffer index, makes sense iff (msg_taken) */
1068  size_t msg_index = 0;
1069  /* Number of buffer descriptors in selected message buffer */
1070  size_t msg_bd_nr = 0;
1071  struct buf_status_ping *msg_bs = 0;
1072  m0_bcount_t msg_offset = 0;
1073  m0_bcount_t len;
1074  bool buf_last;
1075  int rc;
1076  struct m0_tl messages;
1077  bool list_is_empty;
1078 
1079  M0_PRE(ctx != NULL);
1080  M0_PRE(ss != NULL);
1081  M0_PRE(ctx->nbc_bd_nr_max > 0 && ctx->nbc_bd_nr_max % 2 == 0);
1082 
1083  bsp_tlist_init(&messages);
1084  msg_taken = false;
1085  m0_tl_for(bsb, &ss->ssb_buffers, bs) {
1086 take_msg:
1087  if (!msg_taken && m0_net_test_ringbuf_is_empty(rb_ping)) {
1088  /*
1089  * No free message to transfer bulk buffer
1090  * network descriptors. Cancel tranfers.
1091  */
1095  bsb_tlist_del(bs);
1096  continue;
1097  }
1098  /* Take unused msg buf number if it wasn't taken before */
1099  if (!msg_taken) {
1100  msg_index = m0_net_test_ringbuf_pop(rb_ping);
1101  msg_taken = true;
1102  msg_bd_nr = 0;
1103  msg_offset = 0;
1104  msg_bs = &ctx->nbc_bsp[msg_index];
1105  list_is_empty =
1106  bsb_tlist_is_empty(&msg_bs->bsp_buffers);
1107  M0_ASSERT(list_is_empty);
1108  }
1109  /* Try to serialize two buffer descriptors */
1110  len = client_bds_serialize2(&ctx->nbc_net, bs->bsb_index,
1111  msg_index, msg_offset);
1112  /*
1113  LOGD("msg_index = %lu, len = %lu, msg_offset = %lu",
1114  (unsigned long ) msg_index, (unsigned long ) len,
1115  (unsigned long ) msg_offset);
1116  */
1117  msg_offset += len;
1118  if (len == 0) {
1119  if (msg_bd_nr > 0) {
1120  /* No free space in ping buffer */
1121  bsp_tlist_add_tail(&messages, msg_bs);
1122  msg_taken = false;
1123  goto take_msg;
1124  } else {
1125  /*
1126  * Serializing failed for unknown reason
1127  * (or ping buffer is smaller than
1128  * size of two serialized bulk
1129  * network buffer descriptors)
1130  */
1132  TS_FAILED2);
1134  bsb_tlist_del(bs);
1135  msg_taken = false;
1136  }
1137  } else {
1138  buf_last = bsb_tlist_next(&ss->ssb_buffers, bs) == NULL;
1139  bsb_tlist_del(bs);
1140  bsb_tlist_add_tail(&msg_bs->bsp_buffers, bs);
1141  msg_bd_nr += 2;
1142  if (msg_bd_nr == ctx->nbc_bd_nr_max || buf_last) {
1143  bsp_tlist_add_tail(&messages, msg_bs);
1144  msg_taken = false;
1145  }
1146  }
1147  } m0_tl_endfor;
1148  M0_ASSERT(!msg_taken);
1149  m0_tl_for(bsp, &messages, msg_bs) {
1150  list_is_empty = bsb_tlist_is_empty(&msg_bs->bsp_buffers);
1151  M0_ASSERT(!list_is_empty);
1152  /*
1153  * Change state to BD_SENT for every bulk buffer, which
1154  * descriptor is stored in current message.
1155  */
1156  m0_tl_for(bsb, &msg_bs->bsp_buffers, bs) {
1158  } m0_tl_endfor;
1159  rc = m0_net_test_network_msg_send(&ctx->nbc_net,
1160  msg_bs->bsp_index,
1161  ss->ssb_index);
1162  if (rc != 0) {
1163  LOGD("--- msg send FAILED!");
1165  }
1166  /* Save rc for future analysis */
1167  m0_tl_for(bsb, &msg_bs->bsp_buffers, bs) {
1168  bs->bsb_msg.bse_func_rc = rc;
1169  /* Change state if msg sending failed */
1170  if (rc != 0) {
1172  TS_FAILED2);
1174  bsb_tlist_del(bs);
1175  }
1176  } m0_tl_endfor;
1177  bsp_tlist_del(msg_bs);
1178  } m0_tl_endfor;
1179  bsp_tlist_fini(&messages);
1180 }
1181 
1183 {
1184  struct server_status_bulk *ss;
1185  struct buf_status_bulk *bs;
1186  struct m0_tl servers;
1187  size_t index;
1188  size_t i;
1189  size_t nr;
1190 
1191  M0_PRE(ctx != NULL);
1192 
1193  ssb_tlist_init(&servers);
1194  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_queued);
1195  /* Add queued buffer to per server list of queued buffers */
1196  for (i = 0; i < nr; ++i) {
1197  index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_queued);
1198  bs = &ctx->nbc_bs[index];
1199  ss = &ctx->nbc_sstatus[client_server_index(ctx, index)];
1200  bsb_tlist_add_tail(&ss->ssb_buffers, bs);
1201  if (!ssb_tlink_is_in(ss))
1202  ssb_tlist_add_tail(&servers, ss);
1203  }
1204  /* Send message with buffer descriptors to every server */
1205  m0_tl_teardown(ssb, &servers, ss) {
1206  client_bds_send(ctx, ss);
1207  }
1208  ssb_tlist_fini(&servers);
1209 }
1210 
1212 {
1213  size_t i;
1214 
1215  M0_PRE(ctx != NULL);
1216 
1217  for (i = 0; i < ctx->nbc_bs_nr; ++i) {
1218  ctx->nbc_bs[i].bsb_ts = TS_UNUSED;
1219  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_unused, i);
1220  }
1221  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i)
1222  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused, i);
1223 }
1224 
1226 {
1227  size_t i;
1228 
1229  M0_PRE(ctx != NULL);
1230  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1233  }
1234  for (i = 0; i < ctx->nbc_buf_bulk_nr; ++i) {
1237  }
1238 }
1239 
1241 {
1242  M0_PRE(ctx != NULL);
1243 
1244  return m0_net_test_ringbuf_nr(&ctx->nbc_rb_ping_unused) ==
1245  ctx->nbc_buf_ping_nr &&
1246  m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_unused) ==
1247  ctx->nbc_bs_nr;
1248 }
1249 
1250 static void net_bulk_worker_cb(struct node_bulk_ctx *ctx, bool pending)
1251 {
1252  if (USE_LIBFAB) {
1253  /* execute network buffer callbacks in this thread context */
1254  m0_net_buffer_event_deliver_all(ctx->nbc_net.ntc_tm);
1255  M0_ASSERT(ergo(pending, ctx->nbc_callback_executed));
1256  /* update copy of statistics */
1258  /*
1259  * In case of libfabric, there are no pending callbacks,
1260  * hence do not wait for callback in m0_chan_wait() after
1261  * calling m0_net_buffer_event_deliver_all().
1262  */
1263  } else {
1264  ctx->nbc_callback_executed = false;
1265  /* execute network buffer callbacks in this thread context */
1266  m0_net_buffer_event_deliver_all(ctx->nbc_net.ntc_tm);
1267  M0_ASSERT(ergo(pending, ctx->nbc_callback_executed));
1268  /* state transitions from final states */
1270  /* update copy of statistics */
1272  /* wait for STOP command or buffer event */
1273  if (!ctx->nbc_callback_executed)
1274  m0_chan_wait(&ctx->nbc_stop_clink);
1275  }
1276 }
1277 
1278 static void node_bulk_worker(struct node_bulk_ctx *ctx)
1279 {
1280  struct m0_clink tm_clink;
1281  struct m0_chan tm_chan = {};
1282  struct m0_mutex tm_chan_mutex = {};
1283  bool pending;
1284  bool running;
1285 
1286  M0_PRE(ctx != NULL);
1287 
1288  /* all buffers are unused */
1290  /* attach tm_clink to clink group to wait for two signals at once */
1291  m0_clink_attach(&tm_clink, &ctx->nbc_stop_clink, NULL);
1292  /*
1293  * Init wait channel and clink.
1294  * Transfer machine will signal to this channel.
1295  */
1296  m0_mutex_init(&tm_chan_mutex);
1297  m0_chan_init(&tm_chan, &tm_chan_mutex);
1298  m0_clink_add_lock(&tm_chan, &tm_clink);
1299  /* main loop */
1300  running = true;
1301  while (running || !node_bulk_bufs_unused_all(ctx)) {
1302  if (running) {
1303  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1306  } else {
1308  }
1309  }
1310  /* notification for buffer events */
1311  pending = m0_net_buffer_event_pending(ctx->nbc_net.ntc_tm);
1312  if (!pending) {
1313  m0_net_buffer_event_notify(ctx->nbc_net.ntc_tm,
1314  &tm_chan);
1315  }
1316  net_bulk_worker_cb(ctx, pending);
1317  if (running && node_bulk_is_stopping(ctx)) {
1318  /* dequeue all queued network buffers */
1320  running = false;
1321  }
1322  }
1323  /* transfer machine should't signal to tm_chan */
1324  m0_net_buffer_event_notify(ctx->nbc_net.ntc_tm, NULL);
1325  m0_clink_del_lock(&tm_clink);
1326  m0_clink_fini(&tm_clink);
1327  m0_chan_fini_lock(&tm_chan);
1328  m0_mutex_fini(&tm_chan_mutex);
1329 }
1330 
1332  const struct m0_net_test_cmd *cmd)
1333 {
1334  struct m0_net_test_network_timeouts *timeouts;
1335  struct m0_net_test_network_cfg net_cfg;
1336  const struct m0_net_test_cmd_init *icmd;
1337  struct server_status_bulk *ss;
1338  struct buf_status_ping *msg_bs;
1339  int rc;
1340  size_t i;
1341  bool role_client;
1342  m0_time_t to_send;
1343  m0_time_t to_bulk;
1344  size_t nr;
1345 
1346  M0_PRE(ctx != NULL);
1347 
1348  if (cmd == NULL)
1349  goto fini;
1350  icmd = &cmd->ntc_init;
1351  role_client = icmd->ntci_role == M0_NET_TEST_ROLE_CLIENT;
1352 
1353  rc = -ENOMEM;
1354  M0_ALLOC_ARR(ctx->nbc_bs, ctx->nbc_bs_nr);
1355  if (ctx->nbc_bs == NULL)
1356  goto fail;
1357  for (i = 0; i < ctx->nbc_bs_nr; ++i) {
1358  ctx->nbc_bs[i].bsb_index = i;
1359  bsb_tlink_init(&ctx->nbc_bs[i]);
1360  M0_SET0(&ctx->nbc_bs[i].bsb_desc_send);
1361  }
1362 
1363  if (role_client) {
1364  M0_ALLOC_ARR(ctx->nbc_sstatus, icmd->ntci_ep.ntsl_nr);
1365  if (ctx->nbc_sstatus == NULL)
1366  goto free_bs_bulk;
1367  for (i = 0; i < icmd->ntci_ep.ntsl_nr; ++i) {
1368  ss = &ctx->nbc_sstatus[i];
1369  ss->ssb_index = i;
1370  bsb_tlist_init(&ss->ssb_buffers);
1371  ssb_tlink_init(ss);
1372  }
1373  M0_ALLOC_ARR(ctx->nbc_bsp, ctx->nbc_buf_ping_nr);
1374  if (ctx->nbc_bsp == NULL)
1375  goto free_sstatus;
1376  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1377  msg_bs = &ctx->nbc_bsp[i];
1378  msg_bs->bsp_index = i;
1379  bsb_tlist_init(&msg_bs->bsp_buffers);
1380  bsp_tlink_init(msg_bs);
1381  }
1382  }
1383 
1384  M0_ASSERT(equi(role_client, ctx->nbc_sstatus != NULL));
1385 
1386  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_ping_unused,
1387  ctx->nbc_buf_ping_nr);
1388  if (rc != 0)
1389  goto free_bsp;
1390  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_unused,
1391  ctx->nbc_bs_nr);
1392  if (rc != 0)
1393  goto free_rb_ping_unused;
1394  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_queued,
1395  ctx->nbc_bs_nr);
1396  if (rc != 0)
1397  goto free_rb_bulk_unused;
1398  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_final,
1399  ctx->nbc_bs_nr);
1400  if (rc != 0)
1401  goto free_rb_bulk_queued;
1402 
1403  M0_SET0(&net_cfg);
1404  net_cfg.ntncfg_tm_cb = node_bulk_tm_cb;
1405  net_cfg.ntncfg_buf_cb = node_bulk_buf_cb;
1406  net_cfg.ntncfg_buf_size_ping = ctx->nbc_buf_size_ping,
1407  net_cfg.ntncfg_buf_ping_nr = ctx->nbc_buf_ping_nr,
1408  net_cfg.ntncfg_buf_size_bulk = ctx->nbc_buf_size_bulk,
1409  net_cfg.ntncfg_buf_bulk_nr = ctx->nbc_buf_bulk_nr,
1410  net_cfg.ntncfg_ep_max = icmd->ntci_ep.ntsl_nr,
1412  net_cfg.ntncfg_sync = true;
1413  /* configure timeouts */
1414  to_send = icmd->ntci_buf_send_timeout;
1415  to_bulk = icmd->ntci_buf_bulk_timeout;
1416  timeouts = &net_cfg.ntncfg_timeouts;
1417  timeouts->ntnt_timeout[M0_NET_QT_MSG_SEND] = to_send;
1418  timeouts->ntnt_timeout[M0_NET_QT_PASSIVE_BULK_RECV] = to_bulk;
1419  timeouts->ntnt_timeout[M0_NET_QT_PASSIVE_BULK_SEND] = to_bulk;
1420  timeouts->ntnt_timeout[M0_NET_QT_ACTIVE_BULK_RECV] = to_bulk;
1421  timeouts->ntnt_timeout[M0_NET_QT_ACTIVE_BULK_SEND] = to_bulk;
1422 
1423  rc = m0_net_test_network_ctx_init(&ctx->nbc_net, &net_cfg,
1424  icmd->ntci_tm_ep);
1425  if (rc != 0)
1426  goto free_rb_bulk_final;
1427  rc = m0_net_test_network_ep_add_slist(&ctx->nbc_net, &icmd->ntci_ep);
1428  if (rc != 0)
1429  goto fini;
1430  m0_mutex_init(&ctx->nbc_stop_chan_mutex);
1431  m0_mutex_init(&ctx->nbc_bulk_mutex);
1432  m0_chan_init(&ctx->nbc_stop_chan, &ctx->nbc_stop_chan_mutex);
1433  m0_clink_init(&ctx->nbc_stop_clink, NULL);
1434  m0_clink_add_lock(&ctx->nbc_stop_chan, &ctx->nbc_stop_clink);
1435  goto success;
1436 fini:
1437  rc = 0;
1438  m0_clink_del_lock(&ctx->nbc_stop_clink);
1439  m0_clink_fini(&ctx->nbc_stop_clink);
1440  m0_chan_fini_lock(&ctx->nbc_stop_chan);
1441  m0_mutex_fini(&ctx->nbc_stop_chan_mutex);
1442  m0_mutex_fini(&ctx->nbc_bulk_mutex);
1443  m0_net_test_network_ctx_fini(&ctx->nbc_net);
1444 free_rb_bulk_final:
1445  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_final);
1446 free_rb_bulk_queued:
1447  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_queued);
1448 free_rb_bulk_unused:
1449  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_unused);
1450 free_rb_ping_unused:
1451  m0_net_test_ringbuf_fini(&ctx->nbc_rb_ping_unused);
1452 free_bsp:
1453  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1454  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1455  msg_bs = &ctx->nbc_bsp[i];
1456  bsp_tlink_init(msg_bs);
1457  bsb_tlist_fini(&msg_bs->bsp_buffers);
1458  }
1459  m0_free(ctx->nbc_bsp);
1460  }
1461 free_sstatus:
1462  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1463  nr = cmd != NULL ? cmd->ntc_init.ntci_ep.ntsl_nr :
1464  ctx->nbc_net.ntc_ep_nr;
1465  for (i = 0; i < nr; ++i) {
1466  ss = &ctx->nbc_sstatus[i];
1467  ssb_tlink_fini(ss);
1468  bsb_tlist_fini(&ss->ssb_buffers);
1469  }
1470  m0_free(ctx->nbc_sstatus);
1471  }
1472 free_bs_bulk:
1473  for (i = 0; i < ctx->nbc_bs_nr; ++i)
1474  bsb_tlink_fini(&ctx->nbc_bs[i]);
1475  m0_free(ctx->nbc_bs);
1476 fail:
1477 success:
1478  return rc;
1479 }
1480 
1481 static void *node_bulk_initfini(void *ctx_, struct m0_net_test_service *svc)
1482 {
1483  struct node_bulk_ctx *ctx;
1484  int rc;
1485 
1486  M0_PRE(equi(ctx_ == NULL, svc != NULL));
1487 
1488  if (svc != NULL) {
1489  M0_ALLOC_PTR(ctx);
1490  if (ctx != NULL) {
1491  ctx->nbc_svc = svc;
1492  ctx->nbc_nh.ntnh_test_initialized = false;
1493  }
1494  } else {
1495  ctx = ctx_;
1497  M0_ASSERT(rc == 0);
1498  m0_free(ctx);
1499  }
1500  return svc != NULL ? ctx : NULL;
1501 }
1502 
1504 {
1505  return node_bulk_initfini(NULL, svc);
1506 }
1507 
1508 static void node_bulk_fini(void *ctx_)
1509 {
1510  void *rc = node_bulk_initfini(ctx_, NULL);
1511  M0_POST(rc == NULL);
1512 }
1513 
1514 static int node_bulk_step(void *ctx_)
1515 {
1516  return 0;
1517 }
1518 
1519 static int node_bulk_cmd_init(void *ctx_,
1520  const struct m0_net_test_cmd *cmd,
1521  struct m0_net_test_cmd *reply)
1522 {
1523  const struct m0_net_test_cmd_init *icmd;
1524  struct node_bulk_ctx *ctx = ctx_;
1525  int rc;
1526  bool role_client;
1527 
1528  M0_PRE(ctx != NULL);
1529  M0_PRE(cmd != NULL);
1530  M0_PRE(reply != NULL);
1531 
1532  if (ctx->nbc_nh.ntnh_test_initialized) {
1533  rc = -EALREADY;
1534  goto reply;
1535  }
1536  icmd = &cmd->ntc_init;
1537  m0_net_test_nh_init(&ctx->nbc_nh, icmd);
1538  role_client = icmd->ntci_role == M0_NET_TEST_ROLE_CLIENT;
1539  ctx->nbc_buf_size_bulk = icmd->ntci_msg_size;
1540  ctx->nbc_buf_ping_nr = icmd->ntci_bd_buf_nr;
1541  ctx->nbc_buf_size_ping = icmd->ntci_bd_buf_size;
1542  ctx->nbc_bd_nr_max = icmd->ntci_bd_nr_max;
1543  ctx->nbc_bs_nr = icmd->ntci_msg_concurrency;
1544  ctx->nbc_buf_bulk_nr = icmd->ntci_msg_concurrency;
1545 
1546  if (role_client) {
1547  ctx->nbc_client_concurrency = icmd->ntci_msg_concurrency;
1548  ctx->nbc_buf_bulk_nr *= 2 * icmd->ntci_ep.ntsl_nr;
1549  ctx->nbc_bs_nr = ctx->nbc_buf_bulk_nr / 2;
1550  }
1551 
1552  /* do sanity check */
1553  rc = -EINVAL;
1554  if (!ergo(role_client, ctx->nbc_buf_bulk_nr % 2 == 0) ||
1555  ctx->nbc_buf_bulk_nr < 1 || ctx->nbc_buf_size_bulk < 1 ||
1556  ctx->nbc_buf_ping_nr < 1 || ctx->nbc_buf_size_ping < 1 ||
1557  ctx->nbc_bd_nr_max < 1 || ctx->nbc_bs_nr < 1 ||
1558  (ctx->nbc_nh.ntnh_role != M0_NET_TEST_ROLE_CLIENT &&
1559  ctx->nbc_nh.ntnh_role != M0_NET_TEST_ROLE_SERVER) ||
1560  !ergo(role_client, ctx->nbc_client_concurrency != 0) ||
1561  !ergo(!role_client, ctx->nbc_bs_nr == ctx->nbc_buf_bulk_nr) ||
1562  !ergo(role_client, 2 * ctx->nbc_bs_nr == ctx->nbc_buf_bulk_nr))
1563  goto reply;
1564 
1566 reply:
1567  /* fill reply */
1568  reply->ntc_type = M0_NET_TEST_CMD_INIT_DONE;
1569  reply->ntc_done.ntcd_errno = rc;
1570  return rc;
1571 }
1572 
1574 static int node_bulk_cmd_start(void *ctx_,
1575  const struct m0_net_test_cmd *cmd,
1576  struct m0_net_test_cmd *reply)
1577 {
1578  struct m0_net_test_cmd_status_data *sd;
1579  int rc;
1580  struct node_bulk_ctx *ctx = ctx_;
1581  const m0_time_t _1s = M0_MKTIME(1, 0);
1582 
1583  M0_PRE(ctx != NULL);
1584  M0_PRE(cmd != NULL);
1585  M0_PRE(reply != NULL);
1586 
1587  sd = &ctx->nbc_nh.ntnh_sd;
1588 
1590  /* fill test start time */
1591  sd->ntcsd_time_start = m0_time_now();
1592  /* initialize stats */
1595  m0_atomic64_set(&ctx->nbc_stop_flag, 0);
1596  rc = M0_THREAD_INIT(&ctx->nbc_thread, struct node_bulk_ctx *, NULL,
1597  &node_bulk_worker, ctx, "net-test bulk");
1598  if (rc != 0) {
1599  /* change service state */
1602  }
1603  /* fill reply */
1604  reply->ntc_type = M0_NET_TEST_CMD_START_DONE;
1605  reply->ntc_done.ntcd_errno = rc;
1606  return rc;
1607 }
1608 
1610 static int node_bulk_cmd_stop(void *ctx_,
1611  const struct m0_net_test_cmd *cmd,
1612  struct m0_net_test_cmd *reply)
1613 {
1614  struct node_bulk_ctx *ctx = ctx_;
1615  int rc;
1616 
1617  M0_PRE(ctx != NULL);
1618  M0_PRE(cmd != NULL);
1619  M0_PRE(reply != NULL);
1620 
1621  if (!ctx->nbc_nh.ntnh_test_initialized) {
1622  reply->ntc_done.ntcd_errno = -EINVAL;
1623  goto reply;
1624  }
1625  /* stop worker thread */
1626  m0_atomic64_set(&ctx->nbc_stop_flag, 1);
1627  m0_chan_signal_lock(&ctx->nbc_stop_chan);
1628  rc = m0_thread_join(&ctx->nbc_thread);
1629  M0_ASSERT(rc == 0);
1630  m0_thread_fini(&ctx->nbc_thread);
1631  /* change service state */
1634  /* fill reply */
1635  reply->ntc_done.ntcd_errno = 0;
1636 reply:
1637  reply->ntc_type = M0_NET_TEST_CMD_STOP_DONE;
1638  return 0;
1639 }
1640 
1641 static int node_bulk_cmd_status(void *ctx_,
1642  const struct m0_net_test_cmd *cmd,
1643  struct m0_net_test_cmd *reply)
1644 {
1645  struct node_bulk_ctx *ctx = ctx_;
1646 
1647  M0_PRE(ctx != NULL);
1648 
1649  m0_net_test_nh_cmd_status(&ctx->nbc_nh, cmd, reply);
1650  return 0;
1651 }
1652 
1654  {
1656  .ntsch_handler = node_bulk_cmd_init,
1657  },
1658  {
1659  .ntsch_type = M0_NET_TEST_CMD_START,
1660  .ntsch_handler = node_bulk_cmd_start,
1661  },
1662  {
1663  .ntsch_type = M0_NET_TEST_CMD_STOP,
1664  .ntsch_handler = node_bulk_cmd_stop,
1665  },
1666  {
1667  .ntsch_type = M0_NET_TEST_CMD_STATUS,
1668  .ntsch_handler = node_bulk_cmd_status,
1669  },
1670 };
1671 
1674  .ntso_fini = node_bulk_fini,
1675  .ntso_step = node_bulk_step,
1676  .ntso_cmd_handler = node_bulk_cmd_handler,
1677  .ntso_cmd_handler_nr = ARRAY_SIZE(node_bulk_cmd_handler),
1678 };
1679 
1681 {
1683  return 0;
1684 }
1685 
1687 {
1688 }
1689 
1690 #undef NET_TEST_MODULE_NAME
1691 
1694 /*
1695  * Local variables:
1696  * c-indentation-style: "K&R"
1697  * c-basic-offset: 8
1698  * tab-width: 8
1699  * fill-column: 79
1700  * scroll-step: 1
1701  * End:
1702  */
int m0_net_test_node_bulk_init(void)
Definition: node_bulk.c:1680
void m0_net_test_network_buffer_dequeue(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, int32_t buf_index)
Definition: network.c:500
static void transition(void)
Definition: sm.c:88
enum m0_net_test_cmd_type ntsch_type
Definition: service.h:52
static const struct state_transition node_bulk_client_success[]
Definition: node_bulk.c:405
struct m0_net_test_network_timeouts m0_net_test_network_timeouts_never(void)
Definition: network.c:803
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
size_t nbc_client_concurrency
Definition: node_bulk.c:260
struct m0_tlink bsb_tlink
Definition: node_bulk.c:126
static void node_bulk_tm_event_cb(const struct m0_net_tm_event *ev)
Definition: node_bulk.c:294
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static void client_transfer_start(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:959
m0_net_test_nh_msg_type
Definition: node_helper.h:48
static int node_bulk_cmd_stop(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1610
struct m0_net_test_slist ntci_ep
Definition: commands.h:157
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct buf_status_errno bsb_send
Definition: node_bulk.c:133
#define USE_LIBFAB
Definition: net.h:99
uint32_t ntncfg_buf_ping_nr
Definition: network.h:86
m0_time_t ntci_buf_bulk_timeout
Definition: commands.h:153
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
int m0_net_test_network_msg_recv(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:469
static void buf_desc_set0(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:605
int m0_net_test_network_ctx_init(struct m0_net_test_network_ctx *ctx, struct m0_net_test_network_cfg *cfg, const char *tm_addr)
Definition: network.c:367
static int node_bulk_test_init_fini(struct node_bulk_ctx *ctx, const struct m0_net_test_cmd *cmd)
Definition: node_bulk.c:1331
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_net_test_mps ntcsd_mps_recv
Definition: commands.h:186
#define ergo(a, b)
Definition: misc.h:293
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
void m0_net_test_service_state_change(struct m0_net_test_service *svc, enum m0_net_test_service_state state)
Definition: service.c:147
static size_t client_server_index(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:930
uint64_t m0_time_t
Definition: time.h:37
struct m0_tlink ssb_tlink
Definition: node_bulk.c:173
void m0_net_test_nh_sd_copy_locked(struct m0_net_test_nh *nh)
Definition: node_helper.c:63
static void buf_desc_swap(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:619
m0_bcount_t m0_net_test_network_bd_serialize(enum m0_net_test_serialize_op op, struct m0_net_test_network_ctx *ctx, uint32_t buf_bulk_index, uint32_t buf_ping_index, m0_bcount_t offset)
Definition: network.c:655
M0_TL_DESCR_DEFINE(bsb, "buf_status_bulk", static, struct buf_status_bulk, bsb_tlink, bsb_magic, M0_NET_TEST_BSB_MAGIC, M0_NET_TEST_BSB_HEAD_MAGIC)
m0_net_test_network_buffer_cb_proc_t ntnbc_cb[M0_NET_QT_NR]
Definition: network.h:66
int m0_net_test_ringbuf_init(struct m0_net_test_ringbuf *rb, size_t size)
Definition: ringbuf.c:36
static void node_bulk_cb_server(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:749
M0_TL_DEFINE(bsb, static, struct buf_status_bulk)
struct buf_status_errno bsb_recv
Definition: node_bulk.c:138
static void buf_desc_client_free(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:639
void m0_net_test_node_bulk_fini(void)
Definition: node_bulk.c:1686
void m0_net_test_nh_sd_update_rtt(struct m0_net_test_nh *nh, m0_time_t rtt)
Definition: node_helper.c:129
static const struct state_transition node_bulk_server_success[]
Definition: node_bulk.c:419
static const struct state_transition node_bulk_client_failure[]
Definition: node_bulk.c:412
struct m0_net_test_ringbuf nbc_rb_bulk_queued
Definition: node_bulk.c:250
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
uint64_t ssb_magic
Definition: node_bulk.c:169
static void node_bulk_buf_unused(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1211
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
static m0_bcount_t net_test_len_accumulate(m0_bcount_t accumulator, m0_bcount_t addend)
Definition: serialize.h:122
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
uint64_t bsp_magic
Definition: node_bulk.c:153
m0_time_t ntnt_timeout[M0_NET_QT_NR]
Definition: network.h:71
#define M0_SWAP(v0, v1)
Definition: arith.h:207
struct buf_status_bulk * nbc_bs
Definition: node_bulk.c:235
uint64_t bsb_magic
Definition: node_bulk.c:117
size_t bsp_index
Definition: node_bulk.c:155
static int client_bulk_enqueue(struct node_bulk_ctx *ctx, size_t buf_index)
Definition: node_bulk.c:939
m0_bcount_t ntci_msg_size
Definition: commands.h:118
#define m0_tl_endfor
Definition: tlist.h:700
m0_net_test_nh_msg_status
Definition: node_helper.h:42
static const struct m0_net_tm_callbacks node_bulk_tm_cb
Definition: node_bulk.c:299
size_t bsb_index
Definition: node_bulk.c:124
#define equi(a, b)
Definition: misc.h:297
m0_time_t bsb_time_finish
Definition: node_bulk.c:147
m0_net_test_nh_msg_direction
Definition: node_helper.h:55
uint64_t bsb_magic
Definition: balloc.h:634
int m0_net_test_network_bulk_enqueue(struct m0_net_test_network_ctx *ctx, int32_t buf_bulk_index, int32_t ep_index, enum m0_net_queue_type q)
Definition: network.c:479
int i
Definition: dir.c:1033
size_t m0_net_test_ringbuf_pop(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:88
int32_t nbe_status
Definition: net.h:1218
#define LOGD(...)
Definition: tx_regmap.c:37
m0_time_t ntci_buf_send_timeout
Definition: commands.h:151
static void * node_bulk_initfini(void *ctx_, struct m0_net_test_service *svc)
Definition: node_bulk.c:1481
bool m0_net_test_nh_transfer_next(struct m0_net_test_nh *nh)
Definition: node_helper.c:136
struct buf_status_ping * nbc_bsp
Definition: node_bulk.c:239
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:215
uint64_t ntci_bd_nr_max
Definition: commands.h:136
m0_bcount_t ntncfg_buf_size_bulk
Definition: network.h:88
enum transfer_state sta_from
Definition: node_bulk.c:185
void node_bulk_state_transition_auto(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:521
static void sd_update(struct node_bulk_ctx *ctx, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_bulk.c:286
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
static void client_bulk_dequeue(struct node_bulk_ctx *ctx, size_t buf_index)
Definition: node_bulk.c:952
static void client_bulk_bufs_dequeue(struct node_bulk_ctx *ctx, struct buf_status_bulk *bs)
Definition: node_bulk.c:1053
static void node_bulk_state_check_all(void)
Definition: node_bulk.c:459
void m0_net_test_ringbuf_fini(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:52
size_t nbc_bd_nr_max
Definition: node_bulk.c:233
struct m0_net_test_cmd_init ntc_init
Definition: commands.h:208
enum m0_net_test_role ntci_role
Definition: commands.h:104
struct m0_net_test_service * nbc_svc
Definition: node_bulk.c:221
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
struct m0_net_test_network_buffer_callbacks ntncfg_buf_cb
Definition: network.h:82
#define M0_ASSERT(cond)
static int node_bulk_cmd_status(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1641
static void net_bulk_worker_cb(struct node_bulk_ctx *ctx, bool pending)
Definition: node_bulk.c:1250
static char servers[NTCS_NODES_MAX *NTCS_NODE_ADDR_MAX]
Definition: client_server.c:63
struct m0_chan nbc_stop_chan
Definition: node_bulk.c:267
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
m0_time_t m0_time_now(void)
Definition: time.c:134
m0_bcount_t nbc_buf_size_ping
Definition: node_bulk.c:229
size_t nbc_buf_bulk_nr
Definition: node_bulk.c:227
struct server_status_bulk * nbc_sstatus
Definition: node_bulk.c:262
Definition: tlist.h:251
struct m0_net_test_mps ntcsd_mps_send
Definition: commands.h:184
void m0_net_test_nh_cmd_status(struct m0_net_test_nh *nh, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_helper.c:146
struct m0_tl bsp_buffers
Definition: node_bulk.c:161
const size_t nbst_nr
Definition: node_bulk.c:430
static void client_process_unused_bulk(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:988
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
void m0_net_test_ringbuf_push(struct m0_net_test_ringbuf *rb, size_t value)
Definition: ringbuf.c:77
static void * node_bulk_init(struct m0_net_test_service *svc)
Definition: node_bulk.c:1503
static m0_bcount_t buf_desc_deserialize(struct node_bulk_ctx *ctx, size_t buf_bulk_index, size_t buf_ping_index, m0_bcount_t offset)
Definition: node_bulk.c:649
void m0_net_test_network_ctx_fini(struct m0_net_test_network_ctx *ctx)
Definition: network.c:374
struct m0_net_test_ringbuf nbc_rb_bulk_final
Definition: node_bulk.c:254
size_t ntsl_nr
Definition: slist.h:49
void m0_net_test_nh_sd_update(struct m0_net_test_nh *nh, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_helper.c:83
static bool node_bulk_state_is_final(enum transfer_state state)
Definition: node_bulk.c:327
#define TRANSITION(name)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static enum transfer_state node_bulk_state_search(enum transfer_state state, const struct state_transition state_list[], size_t state_nr)
Definition: node_bulk.c:471
static void node_bulk_worker(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1278
static void node_bulk_state_check(const struct state_transition state_list[], size_t state_nr)
Definition: node_bulk.c:444
#define M0_POST(cond)
enum transfer_state bsb_ts
Definition: node_bulk.c:119
struct m0_tlink bsp_tlink
Definition: node_bulk.c:163
m0_bcount_t ntci_bd_buf_size
Definition: commands.h:130
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
m0_net_test_role
Definition: commands.h:59
static m0_bcount_t client_bds_serialize2(struct m0_net_test_network_ctx *net, size_t bsb_index, size_t msg_buf_index, m0_bcount_t offset)
Definition: node_bulk.c:1017
Definition: chan.h:229
static m0_bindex_t offset
Definition: dump.c:173
struct m0_net_test_nh nbc_nh
Definition: node_bulk.c:217
uint64_t ntci_msg_concurrency
Definition: commands.h:144
int m0_net_test_network_msg_send(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index, uint32_t ep_index)
Definition: network.c:457
static void node_bulk_fini(void *ctx_)
Definition: node_bulk.c:1508
size_t m0_net_test_network_bd_nr(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:712
struct m0_atomic64 nbc_stop_flag
Definition: node_bulk.c:277
static struct fdmi_ctx ctx
Definition: main.c:80
transfer_state
Definition: node_bulk.c:89
struct buf_status_errno bsb_msg
Definition: node_bulk.c:128
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
static struct m0_net_test_network_buffer_callbacks node_bulk_buf_cb
Definition: node_bulk.c:918
M0_INTERNAL void m0_net_buffer_event_deliver_all(struct m0_net_transfer_mc *tm)
Definition: tm.c:397
size_t nbc_bs_nr
Definition: node_bulk.c:237
uint64_t ntci_bd_buf_nr
Definition: commands.h:124
enum transfer_state sta_to
Definition: node_bulk.c:186
struct m0_mutex nbc_stop_chan_mutex
Definition: node_bulk.c:269
size_t m0_net_test_ringbuf_nr(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:106
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_net_transfer_mc * ntc_tm
Definition: network.h:113
void m0_net_test_mps_init(struct m0_net_test_mps *mps, unsigned long messages, m0_time_t timestamp, m0_time_t interval)
Definition: stats.c:227
struct m0_net_buffer * m0_net_test_network_buf(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, uint32_t buf_index)
Definition: network.c:725
static struct m0_net_buffer * net_buf_bulk_get(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:598
M0_INTERNAL void m0_net_buffer_event_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
Definition: tm.c:423
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
const struct state_transition * nbst_transition
Definition: node_bulk.c:429
struct m0_net_test_ringbuf nbc_rb_bulk_unused
Definition: node_bulk.c:245
uint32_t ntncfg_ep_max
Definition: network.h:92
M0_INTERNAL bool m0_net_buffer_event_pending(struct m0_net_transfer_mc *tm)
Definition: tm.c:409
size_t nbc_buf_ping_nr
Definition: node_bulk.c:225
struct m0_net_end_point * ntm_ep
Definition: net.h:868
m0_net_queue_type
Definition: net.h:591
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static bool node_bulk_is_stopping(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:841
m0_bcount_t ntncfg_buf_size_ping
Definition: network.h:84
void m0_net_test_nh_init(struct m0_net_test_nh *nh, const struct m0_net_test_cmd_init *icmd)
Definition: node_helper.c:34
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
struct m0_clink nbc_stop_clink
Definition: node_bulk.c:275
static struct m0_net_test_service svc
Definition: service.c:34
static void node_bulk_state_change_cb(struct node_bulk_ctx *ctx, size_t bs_index, bool success)
Definition: node_bulk.c:485
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
struct m0_net_test_network_timeouts ntncfg_timeouts
Definition: network.h:97
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static int node_bulk_step(void *ctx_)
Definition: node_bulk.c:1514
static int node_bulk_cmd_init(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1519
bool m0_net_test_ringbuf_is_empty(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:99
struct m0_net_test_service_ops m0_net_test_node_bulk_ops
Definition: node_bulk.c:1672
struct m0_net_buf_desc bsb_desc_send
Definition: node_bulk.c:143
uint32_t ntncfg_buf_bulk_nr
Definition: network.h:90
int fini(struct workload *w)
static void node_bulk_state_change(struct node_bulk_ctx *ctx, size_t bs_index, enum transfer_state state)
Definition: node_bulk.c:332
#define M0_MKTIME(secs, ns)
Definition: time.h:86
int m0_net_test_network_ep_add_slist(struct m0_net_test_network_ctx *ctx, const struct m0_net_test_slist *eps)
Definition: network.c:400
static struct m0_addb2_net * net
Definition: net.c:27
static bool node_bulk_bufs_unused_all(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1240
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
void node_bulk_state_transition_auto_all(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:555
static void client_process_queued_bulk(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1182
static void client_bds_send(struct node_bulk_ctx *ctx, struct server_status_bulk *ss)
Definition: node_bulk.c:1060
static const struct state_transition node_bulk_server_failure[]
Definition: node_bulk.c:423
static bool node_bulk_state_change_allowed(enum transfer_state from, enum transfer_state to, const struct state_transition allowed[], size_t allowed_size)
Definition: node_bulk.c:310
Definition: nucleus.c:42
struct m0_mutex nbc_bulk_mutex
Definition: node_bulk.c:241
static void node_bulk_cb(struct m0_net_test_network_ctx *net_ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:846
static struct node_bulk_ctx * node_bulk_ctx_from_net_ctx(struct m0_net_test_network_ctx *net_ctx)
Definition: node_bulk.c:304
void *(* ntso_init)(struct m0_net_test_service *svc)
Definition: service.h:76
int type
Definition: dir.c:1031
static struct m0_net_test_service_cmd_handler node_bulk_cmd_handler[]
Definition: node_bulk.c:1653
static void server_process_unused_ping(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:571
static m0_bcount_t node_bulk_server_transfer_start(struct node_bulk_ctx *ctx, size_t buf_ping_index, m0_bcount_t offset)
Definition: node_bulk.c:702
static void node_bulk_cb_client(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:809
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
m0_bcount_t nbc_buf_size_bulk
Definition: node_bulk.c:231
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
static const struct @411 node_bulk_state_transitions[]
m0_time_t bsb_time_start
Definition: node_bulk.c:145
static void node_bulk_buf_dequeue(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1225
static int node_bulk_cmd_start(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1574
int32_t rc
Definition: trigger_fop.h:47
void m0_net_test_network_bd_nr_dec(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:718
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_net_tm_callbacks ntncfg_tm_cb
Definition: network.h:80
struct m0_thread nbc_thread
Definition: node_bulk.c:223
struct m0_net_test_network_ctx nbc_net
Definition: node_bulk.c:219
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
struct m0_tl ssb_buffers
Definition: node_bulk.c:175
struct m0_net_test_ringbuf nbc_rb_ping_unused
Definition: node_bulk.c:243
bool nbc_callback_executed
Definition: node_bulk.c:282
static void buf_desc_server_free(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:629
#define M0_IMPOSSIBLE(fmt,...)