Motr  M0
bulk_if.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/types.h"
24 #include "lib/arith.h"
25 #include "lib/assert.h"
26 #include "lib/errno.h"
27 #include "lib/misc.h"
28 #include "lib/memory.h"
29 #include "lib/mutex.h"
30 #include "ut/ut.h"
31 
32 #include "net/net_internal.h"
33 
34 static struct m0_net_domain utdom;
35 static struct m0_net_transfer_mc ut_tm;
37 
38 static void make_desc(struct m0_net_buf_desc *desc);
39 
40 #if 0
41 #define KPRN(fmt,...) printk(KERN_ERR fmt, ## __VA_ARGS__)
42 #else
43 #define KPRN(fmt,...)
44 #endif
45 
46 #define DELAY_MS(ms) m0_nanosleep(m0_time(0, (ms) * 1000000ULL), NULL)
47 
48 /*
49  *****************************************************
50  Fake transport for the UT
51  *****************************************************
52 */
53 static struct {
54  int num;
55 } ut_xprt_pvt;
56 
57 static bool ut_dom_init_called;
58 static bool ut_dom_fini_called;
65 static const struct m0_bitmap *ut_tm_confine_bm;
68 static bool ut_buf_del_called;
69 static bool ut_buf_add_called;
70 static bool ut_tm_init_called;
71 static bool ut_tm_fini_called;
72 static bool ut_tm_start_called;
73 static bool ut_tm_stop_called;
82 
83 static int net_bulk_if_init(void)
84 {
85  ut_dom_init_called = false;
86  ut_dom_fini_called = false;
92  ut_tm_confine_called = false;
94  ut_buf_register_called = false;
96  ut_buf_del_called = false;
97  ut_buf_add_called = false;
98  ut_tm_init_called = false;
99  ut_tm_fini_called = false;
100  ut_tm_start_called = false;
101  ut_tm_stop_called = false;
104  ut_bev_pending_called = false;
108  ut_bev_notify_called = false;
110 
111  return 0;
112 }
113 
114 static int ut_dom_init(const struct m0_net_xprt *xprt,
115  struct m0_net_domain *dom)
116 {
118  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
124  ut_dom_fini_called = false;
125  ut_dom_init_called = true;
126  dom->nd_xprt_private = &ut_xprt_pvt;
127  return 0;
128 }
129 
130 static void ut_dom_fini(struct m0_net_domain *dom)
131 {
133  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
134  ut_dom_fini_called = true;
135 }
136 
137 /* params */
138 enum {
143 };
144 
146 {
148  return UT_MAX_BUF_SIZE;
149 }
150 
152  *dom)
153 {
156 }
157 
158 static int32_t ut_get_max_buffer_segments(const struct m0_net_domain *dom)
159 {
161  return UT_MAX_BUF_SEGMENTS;
162 }
163 
165 {
166  return UT_MAX_BUF_DESC_SIZE;
167 }
168 
169 struct ut_ep {
170  char *addr;
172 };
173 
175 static void ut_end_point_release(struct m0_ref *ref)
176 {
177  struct m0_net_end_point *ep;
178  struct ut_ep *utep;
179  struct m0_net_transfer_mc *tm;
181  ep = container_of(ref, struct m0_net_end_point, nep_ref);
183  tm = ep->nep_tm;
184  M0_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
185  m0_nep_tlist_del(ep);
186  ep->nep_tm = NULL;
187  utep = container_of(ep, struct ut_ep, uep);
188  m0_free(utep);
189 }
190 
191 static int ut_end_point_create(struct m0_net_end_point **epp,
192  struct m0_net_transfer_mc *tm,
193  const char *addr)
194 {
195  char *ap;
196  struct ut_ep *utep;
197  struct m0_net_end_point *ep;
198 
199  M0_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
201  if (addr == NULL) {
202  /* don't support dynamic */
203  return -ENOSYS;
204  }
205  ap = (char *)addr; /* avoid strdup; this is a ut! */
206  /* check if its already on the domain list */
207  m0_tl_for(m0_nep, &tm->ntm_end_points, ep) {
208  utep = container_of(ep, struct ut_ep, uep);
209  if (strcmp(utep->addr, ap) == 0) {
210  m0_ref_get(&ep->nep_ref); /* refcnt++ */
211  *epp = ep;
212  return 0;
213  }
214  } m0_tl_endfor;
215  /* allocate a new end point */
216  M0_ALLOC_PTR(utep);
217  utep->addr = ap;
218  utep->uep.nep_addr = ap;
220  utep->uep.nep_tm = tm;
221  m0_nep_tlink_init_at_tail(&utep->uep, &tm->ntm_end_points);
222  *epp = &utep->uep;
223  return 0;
224 }
225 
226 static int ut_buf_register(struct m0_net_buffer *nb)
227 {
229  ut_buf_register_called = true;
230  return 0;
231 }
232 
233 static void ut_buf_deregister(struct m0_net_buffer *nb)
234 {
237  return;
238 }
239 
240 static int ut_buf_add(struct m0_net_buffer *nb)
241 {
242  M0_UT_ASSERT(m0_mutex_is_locked(&nb->nb_tm->ntm_mutex));
244  switch (nb->nb_qtype) {
247  /* passive bulk ops required to set nb_desc */
248  make_desc(&nb->nb_desc);
249  break;
250  default:
251  break;
252  }
253  ut_buf_add_called = true;
254  return 0;
255 }
256 
257 static void ut_post_del_thread(struct m0_net_buffer *nb)
258 {
259  struct m0_net_buffer_event ev = {
260  .nbe_buffer = nb,
261  .nbe_status = 0,
262  };
263  if (nb->nb_flags & M0_NET_BUF_CANCELLED)
264  ev.nbe_status = -ECANCELED; /* required behavior */
265  DELAY_MS(1);
266  ev.nbe_time = m0_time_now();
267 
268  /* post requested event */
270 }
271 
273 static void ut_buf_del(struct m0_net_buffer *nb)
274 {
275  int rc;
277  M0_UT_ASSERT(m0_mutex_is_locked(&nb->nb_tm->ntm_mutex));
278  ut_buf_del_called = true;
279  if (!(nb->nb_flags & M0_NET_BUF_IN_USE))
282  &ut_post_del_thread, nb, "ut_post_del");
283  M0_UT_ASSERT(rc == 0);
284  return;
285 }
286 
287 struct ut_tm_pvt {
289 };
290 static int ut_tm_init(struct m0_net_transfer_mc *tm)
291 {
292  struct ut_tm_pvt *tmp;
294  M0_ALLOC_PTR(tmp);
295  tmp->tm = tm;
296  tm->ntm_xprt_private = tmp;
297  ut_tm_init_called = true;
298  return 0;
299 }
300 
301 static void ut_tm_fini(struct m0_net_transfer_mc *tm)
302 {
303  struct ut_tm_pvt *tmp;
305  tmp = tm->ntm_xprt_private;
306  M0_UT_ASSERT(tmp->tm == tm);
307  m0_free(tmp);
308  ut_tm_fini_called = true;
309  return;
310 }
311 
312 static struct m0_thread ut_tm_thread;
314 {
315  struct m0_net_tm_event ev = {
317  .nte_tm = ut_evt_tm,
318  .nte_ep = ep,
319  .nte_status = 0,
320  .nte_next_state = M0_NET_TM_STARTED
321  };
322  DELAY_MS(1);
323  ev.nte_time = m0_time_now();
324 
325  /* post state change event */
327 }
328 
330 {
331  struct m0_net_tm_event ev = {
333  .nte_tm = ut_evt_tm,
334  .nte_status = 0,
335  .nte_next_state = (enum m0_net_tm_state) n
336  };
337  DELAY_MS(1);
338  ev.nte_time = m0_time_now();
339 
340  /* post state change event */
342 }
343 
344 static int ut_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
345 {
346  int rc;
347  const struct m0_net_xprt *xprt;
348  struct m0_net_end_point *ep;
349 
350  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
351  ut_tm_start_called = true;
352  ut_evt_tm = tm;
353 
354  /* create the end point (indirectly via the transport ops vector) */
355  xprt = tm->ntm_dom->nd_xprt;
356  rc = (*xprt->nx_ops->xo_end_point_create)(&ep, tm, addr);
357  if (rc != 0)
358  return rc;
359 
360  /* create bg thread to post start state change event.
361  cannot do it here: we are in tm lock, post would assert.
362  */
366  "state_change%d", M0_NET_TM_STARTED);
367  M0_UT_ASSERT(rc == 0);
368  return rc;
369 }
370 
371 static int ut_tm_stop(struct m0_net_transfer_mc *tm, bool cancel)
372 {
373  int rc;
374 
375  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
376  ut_tm_stop_called = true;
377  ut_evt_tm = tm;
381  "state_change%d", M0_NET_TM_STOPPED);
382  M0_UT_ASSERT(rc == 0);
383  return rc;
384 }
385 
386 static int ut_tm_confine(struct m0_net_transfer_mc *tm,
387  const struct m0_bitmap *processors)
388 {
389  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
390  ut_tm_confine_called = true;
391  ut_tm_confine_bm = processors;
392  return 0;
393 }
394 
396 {
397  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
399  return 0;
400 }
401 
402 static void ut_bev_deliver_all(struct m0_net_transfer_mc *tm)
403 {
404  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
406  return;
407 }
408 
409 static bool ut_bev_pending(struct m0_net_transfer_mc *tm)
410 {
411  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
412  ut_bev_pending_called = true;
414  return (bool)ut_bev_pending_last;
415 }
416 
417 static void ut_bev_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
418 {
419  M0_UT_ASSERT(m0_mutex_is_locked(&tm->ntm_mutex));
420  M0_UT_ASSERT(chan == &tm->ntm_chan);
421  ut_bev_notify_called = true;
422  return;
423 }
424 
425 static struct m0_net_xprt_ops ut_xprt_ops = {
427  .xo_dom_fini = ut_dom_fini,
428  .xo_get_max_buffer_size = ut_get_max_buffer_size,
429  .xo_get_max_buffer_segment_size = ut_get_max_buffer_segment_size,
430  .xo_get_max_buffer_segments = ut_get_max_buffer_segments,
431  .xo_get_max_buffer_desc_size = ut_get_max_buffer_desc_size,
432  .xo_end_point_create = ut_end_point_create,
433  .xo_buf_register = ut_buf_register,
434  .xo_buf_deregister = ut_buf_deregister,
435  .xo_buf_add = ut_buf_add,
436  .xo_buf_del = ut_buf_del,
437  .xo_tm_init = ut_tm_init,
438  .xo_tm_fini = ut_tm_fini,
439  .xo_tm_start = ut_tm_start,
440  .xo_tm_stop = ut_tm_stop,
441  /* define at runtime .xo_tm_confine */
442  /* define at runtime .xo_bev_deliver_sync */
443  .xo_bev_deliver_all = ut_bev_deliver_all,
444  .xo_bev_pending = ut_bev_pending,
445  .xo_bev_notify = ut_bev_notify
446 };
447 
448 static struct m0_net_xprt ut_xprt = {
449  .nx_name = "ut/bulk",
450  .nx_ops = &ut_xprt_ops
451 };
452 
453 /* utility subs */
454 static struct m0_net_buffer *
456  m0_bcount_t buf_seg_size,
457  int32_t buf_segs)
458 {
459  int rc;
460  int i;
461  struct m0_net_buffer *nbs;
462  struct m0_net_buffer *nb;
463  m0_bcount_t sz;
464  int32_t nr;
465 
467  for (i = 0; i < M0_NET_QT_NR; ++i) {
468  nb = &nbs[i];
469  M0_SET0(nb);
470  nr = buf_segs;
471  if ((buf_size / buf_segs) > buf_seg_size) {
472  sz = buf_seg_size;
473  M0_ASSERT((sz * nr) <= buf_size);
474  } else {
475  sz = buf_size/buf_segs;
476  }
477  rc = m0_bufvec_alloc(&nb->nb_buffer, nr, sz);
478  M0_UT_ASSERT(rc == 0);
480  M0_UT_ASSERT(m0_vec_count(&nb->nb_buffer.ov_vec) == (sz * nr));
481  }
482 
483  return nbs;
484 }
485 
486 static void make_desc(struct m0_net_buf_desc *desc)
487 {
488  static const char *p = "descriptor";
489  size_t len = strlen(p)+1;
490  desc->nbd_data = m0_alloc(len);
491  desc->nbd_len = len;
492  strcpy((char *)desc->nbd_data, p);
493 }
494 
495 /* callback subs */
497 static uint64_t num_adds[M0_NET_QT_NR];
498 static uint64_t num_dels[M0_NET_QT_NR];
501 
502 static void ut_buffer_event_callback(const struct m0_net_buffer_event *ev,
503  enum m0_net_queue_type qt,
504  bool queue_check)
505 {
506  m0_bcount_t len = 0;
507  M0_UT_ASSERT(ev->nbe_buffer != NULL);
509  ut_cb_calls[qt]++;
510  if (queue_check)
512  /* Collect stats to test the q stats.
513  Length counted only on success.
514  Receive buffer lengths are in the event.
515  */
516  if (qt == M0_NET_QT_MSG_RECV ||
519  /* assert that the buffer length not set by the API */
520  M0_UT_ASSERT(ev->nbe_buffer->nb_length == 0);
521  if (ev->nbe_status == 0) {
522  len = ev->nbe_length;
523  M0_UT_ASSERT(len != 0);
524  ev->nbe_buffer->nb_length = ev->nbe_length;
525  }
526  } else {
527  if (ev->nbe_status == 0) {
528  len = ev->nbe_buffer->nb_length;
529  }
530  }
533  } else if (ev->nbe_status == 0 &&
537  }
538 
539  total_bytes[qt] += len;
541 }
542 
543 #define UT_CB_CALL(_qt) ut_buffer_event_callback(ev, _qt, true)
544 static void ut_msg_recv_cb(const struct m0_net_buffer_event *ev)
545 {
547 }
548 
549 static void ut_msg_send_cb(const struct m0_net_buffer_event *ev)
550 {
552 }
553 
554 static void ut_passive_bulk_recv_cb(const struct m0_net_buffer_event *ev)
555 {
557 }
558 
559 static void ut_passive_bulk_send_cb(const struct m0_net_buffer_event *ev)
560 {
562 }
563 
564 static void ut_active_bulk_recv_cb(const struct m0_net_buffer_event *ev)
565 {
567 }
568 
569 static void ut_active_bulk_send_cb(const struct m0_net_buffer_event *ev)
570 {
572 }
573 
574 static void ut_multi_use_cb(const struct m0_net_buffer_event *ev)
575 {
576  M0_UT_ASSERT(ev->nbe_buffer != NULL);
579  } else {
581  }
584 }
585 
586 void ut_tm_event_cb(const struct m0_net_tm_event *ev)
587 {
589 }
590 
591 /* UT transfer machine */
594 };
595 
597  .nbc_cb = {
604  },
605 };
606 
608  .nbc_cb = {
615  },
616 };
617 
618 static struct m0_net_transfer_mc ut_tm = {
620  .ntm_state = M0_NET_TM_UNDEFINED
621 };
622 
623 /*
624  Unit test starts
625  */
626 static void test_net_bulk_if(void)
627 {
628  int rc, i, reuse_cnt;
629  bool brc;
630  m0_bcount_t buf_size, buf_seg_size;
631  int32_t buf_segs;
632  struct m0_net_domain *dom = &utdom;
633  struct m0_net_transfer_mc *tm = &ut_tm;
634  struct m0_net_buffer *nbs;
635  struct m0_net_buffer *nb;
636  struct m0_net_end_point *ep1, *ep2, *ep;
637  struct m0_net_buf_desc d1, d2;
638  struct m0_clink tmwait;
639  struct m0_net_qstats qs[M0_NET_QT_NR];
640  m0_time_t m0tt_to_period;
641  struct m0_bitmap *procmask = (void *) -1; /* fake not null UT value */
642  enum { NUM_REUSES = 2 };
643 
644  M0_SET0(&d1);
645  M0_SET0(&d2);
646  make_desc(&d1);
647  M0_UT_ASSERT(d1.nbd_data != NULL);
648  M0_UT_ASSERT(d1.nbd_len > 0);
649  rc = m0_net_desc_copy(&d1, &d2);
650  M0_UT_ASSERT(rc == 0);
651  M0_UT_ASSERT(d2.nbd_data != NULL);
652  M0_UT_ASSERT(d2.nbd_len > 0);
653  M0_UT_ASSERT(d1.nbd_data != d2.nbd_data);
654  M0_UT_ASSERT(d1.nbd_len == d2.nbd_len);
655  M0_UT_ASSERT(memcmp(d1.nbd_data, d2.nbd_data, d1.nbd_len) == 0);
656  m0_net_desc_free(&d2);
657  M0_UT_ASSERT(d2.nbd_data == NULL);
658  M0_UT_ASSERT(d2.nbd_len == 0);
659  m0_net_desc_free(&d1);
660 
661  /* initialize the domain */
663  /* get max buffer size */
665  buf_size = 0;
666  /* get max buffer segment size */
668  buf_seg_size = 0;
669  /* get max buffer segments */
671  buf_segs = 0;
673  M0_UT_ASSERT(rc == 0);
675  M0_UT_ASSERT(dom->nd_xprt == &ut_xprt);
676  M0_UT_ASSERT(dom->nd_xprt_private == &ut_xprt_pvt);
677  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
678 
681  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
683 
686  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
687  M0_UT_ASSERT(buf_seg_size == UT_MAX_BUF_SEGMENT_SIZE);
688 
691  M0_ASSERT(m0_mutex_is_not_locked(&dom->nd_mutex));
692  M0_UT_ASSERT(buf_segs == UT_MAX_BUF_SEGMENTS);
693 
694  /* allocate buffers for testing */
695  nbs = allocate_buffers(buf_size, buf_seg_size, buf_segs);
696 
697  /* register the buffers */
698  for (i = 0; i < M0_NET_QT_NR; ++i) {
699  nb = &nbs[i];
700  nb->nb_flags = 0;
701  nb->nb_timeout = 0;
702  ut_buf_register_called = false;
704  M0_UT_ASSERT(rc == 0);
708  num_adds[i] = 0;
709  num_dels[i] = 0;
710  total_bytes[i] = 0;
711  ut_cb_calls[i] = 0;
712  max_bytes[i] = 0;
713  }
714  m0tt_to_period = m0_time(120, 0); /* 2 min */
715 
716  /* TM init with callbacks */
717  rc = m0_net_tm_init(tm, dom);
718  M0_UT_ASSERT(rc == 0);
722  /* should be able to fini it immediately */
723  ut_tm_fini_called = false;
724  m0_net_tm_fini(tm);
727 
728  /* should be able to init it again */
729  ut_tm_init_called = false;
730  ut_tm_fini_called = false;
731  rc = m0_net_tm_init(tm, dom);
732  M0_UT_ASSERT(rc == 0);
736 
737  /* check the confine API */
738  M0_UT_ASSERT(dom->nd_xprt->nx_ops->xo_tm_confine == NULL);
739  rc = m0_net_tm_confine(tm, procmask);
740  M0_UT_ASSERT(rc == -ENOSYS); /* optional support */
742  ut_xprt_ops.xo_tm_confine = ut_tm_confine; /* provide the operation */
743  M0_UT_ASSERT(dom->nd_xprt->nx_ops->xo_tm_confine != NULL);
744  rc = m0_net_tm_confine(tm, procmask);
745  M0_UT_ASSERT(rc == 0);
747  M0_UT_ASSERT(ut_tm_confine_bm == procmask);
749 
750  /* TM start */
751  m0_clink_init(&tmwait, NULL);
752  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
753 
755  rc = m0_net_tm_start(tm, "addr2");
756  M0_UT_ASSERT(rc == 0);
760 
761  /* wait on channel for started */
762  m0_chan_wait(&tmwait);
763  m0_clink_del_lock(&tmwait);
767 
768  M0_UT_ASSERT(tm->ntm_ep != NULL);
770 
771  /* Test desired end point behavior
772  A real transport isn't actually forced to maintain
773  reference counts this way, but ought to do so.
774  */
776  rc = m0_net_end_point_create(&ep1, tm, NULL);
777  M0_UT_ASSERT(rc != 0); /* no dynamic */
779  M0_ASSERT(m0_mutex_is_not_locked(&tm->ntm_mutex));
780  M0_ASSERT(m0_nep_tlist_length(&tm->ntm_end_points) == 1);
781 
783  rc = m0_net_end_point_create(&ep1, tm, "addr1");
784  M0_UT_ASSERT(rc == 0);
786  M0_ASSERT(m0_mutex_is_not_locked(&tm->ntm_mutex));
787  M0_ASSERT(!m0_nep_tlist_is_empty(&tm->ntm_end_points));
789 
790  rc = m0_net_end_point_create(&ep2, tm, "addr2");
791  M0_UT_ASSERT(rc == 0);
792  M0_UT_ASSERT(ep2 != ep1);
793  M0_UT_ASSERT(ep2 == tm->ntm_ep);
795 
796  rc = m0_net_end_point_create(&ep, tm, "addr1");
797  M0_UT_ASSERT(rc == 0);
798  M0_UT_ASSERT(ep == ep1);
799  M0_UT_ASSERT(m0_atomic64_get(&ep->nep_ref.ref_cnt) == 2);
800 
802  m0_net_end_point_get(ep); /* refcnt=3 */
803  M0_UT_ASSERT(m0_atomic64_get(&ep->nep_ref.ref_cnt) == 3);
804 
806  m0_net_end_point_put(ep); /* refcnt=2 */
808  M0_UT_ASSERT(m0_atomic64_get(&ep->nep_ref.ref_cnt) == 2);
809 
810  m0_net_end_point_put(ep); /* refcnt=1 */
812  M0_UT_ASSERT(m0_atomic64_get(&ep->nep_ref.ref_cnt) == 1);
813 
814  m0_net_end_point_put(ep); /* refcnt=0 */
817  ep1 = NULL; /* not valid! */
818 
819  /* add MSG_RECV buf with a timeout in the past - should fail */
820  nb = &nbs[M0_NET_QT_MSG_RECV];
821  nb->nb_callbacks = &ut_buf_cb;
824  nb->nb_timeout = m0_time_sub(m0_time_now(), m0tt_to_period);
826  nb->nb_max_receive_msgs = 1;
827  rc = m0_net_buffer_add(nb, tm);
828  M0_UT_ASSERT(rc == -ETIME);
829 
830  /* add MSG_RECV buf - should succeeded as now started */
831  nb = &nbs[M0_NET_QT_MSG_RECV];
832  nb->nb_callbacks = &ut_buf_cb;
835  nb->nb_timeout = m0_time_add(m0_time_now(), m0tt_to_period);
837  nb->nb_max_receive_msgs = 1;
838  nb->nb_msgs_received = 999; /* arbitrary */
839  rc = m0_net_buffer_add(nb, tm);
840  M0_UT_ASSERT(rc == 0);
843  M0_UT_ASSERT(nb->nb_tm == tm);
844  M0_UT_ASSERT(nb->nb_msgs_received == 0); /* reset */
845  num_adds[nb->nb_qtype]++;
846  max_bytes[nb->nb_qtype] = max64u(nb->nb_length,
847  max_bytes[nb->nb_qtype]);
848 
849  /* clean up; real xprt would handle this itself */
852 
853  /* initialize and add remaining types of buffers
854  use buffer private callbacks for the bulk
855  */
856  for (i = M0_NET_QT_MSG_SEND; i < M0_NET_QT_NR; ++i) {
857  nb = &nbs[i];
859  nb->nb_qtype = i;
860  nb->nb_callbacks = &ut_buf_cb;
861  /* NB: real code sets nb_ep to server ep */
862  switch (i) {
863  case M0_NET_QT_MSG_SEND:
864  nb->nb_ep = ep2;
865  nb->nb_length = buf_size;
866  break;
868  M0_UT_ASSERT(nb->nb_length == 0);
869  nb->nb_ep = ep2;
870  break;
872  nb->nb_ep = ep2;
873  nb->nb_length = buf_size;
874  break;
876  M0_UT_ASSERT(nb->nb_length == 0);
877  make_desc(&nb->nb_desc);
878  break;
880  nb->nb_length = buf_size;
881  make_desc(&nb->nb_desc);
882  break;
883  }
884  nb->nb_timeout = m0_time_add(m0_time_now(), m0tt_to_period);
885  rc = m0_net_buffer_add(nb, tm);
886  M0_UT_ASSERT(rc == 0);
888  M0_UT_ASSERT(nb->nb_tm == tm);
889  num_adds[nb->nb_qtype]++;
891  max_bytes[nb->nb_qtype]);
892  }
894 
895  /* fake each type of buffer "post" response.
896  xprt normally does this
897  */
898  for (i = M0_NET_QT_MSG_RECV; i < M0_NET_QT_NR; ++i) {
899  struct m0_net_buffer_event ev = {
900  .nbe_buffer = &nbs[i],
901  .nbe_status = 0,
902  };
903  DELAY_MS(1);
904  ev.nbe_time = m0_time_now();
905  nb = &nbs[i];
906 
907  if (i == M0_NET_QT_MSG_RECV) {
908  /* simulate transport ep in recv msg */
909  ev.nbe_ep = ep2;
911  }
912 
913  if (i == M0_NET_QT_MSG_RECV ||
916  /* fake the length in the event */
917  ev.nbe_length = buf_size;
918  }
919 
922  M0_UT_ASSERT(ut_cb_calls[i] == 1);
925  if (i == M0_NET_QT_MSG_RECV)
926  M0_UT_ASSERT(nb->nb_msgs_received == 1);
927  }
929  /* add a buffer and fake del - check callback */
930  nb = &nbs[M0_NET_QT_PASSIVE_BULK_SEND];
934  rc = m0_net_buffer_add(nb, tm);
935  M0_UT_ASSERT(rc == 0);
936  num_adds[nb->nb_qtype]++;
937  max_bytes[nb->nb_qtype] = max64u(nb->nb_length,
938  max_bytes[nb->nb_qtype]);
939 
940  ut_buf_del_called = false;
941  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
942  m0_net_buffer_del(nb, tm);
945  num_dels[nb->nb_qtype]++;
946 
947  /* wait on channel for post (and consume UT thread) */
948  m0_chan_wait(&tmwait);
949  m0_clink_del_lock(&tmwait);
951  M0_UT_ASSERT(rc == 0);
952 
953  /* Initialize and add buffers for multi-use.
954  Note: the net API does not restrict multi-use to the recv queue.
955  */
957  for (i = M0_NET_QT_MSG_RECV; i < M0_NET_QT_NR; ++i) {
958  nb = &nbs[i];
960  nb->nb_qtype = i;
962  nb->nb_length = 0;
963  /* NB: real code sets nb_ep to server ep */
964  switch (i) {
965  case M0_NET_QT_MSG_RECV:
967  nb->nb_max_receive_msgs = 2;
968  break;
969  case M0_NET_QT_MSG_SEND:
970  nb->nb_ep = ep2;
971  nb->nb_length = buf_size;
972  break;
974  nb->nb_ep = ep2;
975  break;
977  nb->nb_ep = ep2;
978  nb->nb_length = buf_size;
979  break;
981  make_desc(&nb->nb_desc);
982  break;
984  nb->nb_length = buf_size;
985  make_desc(&nb->nb_desc);
986  break;
987  }
989  m0tt_to_period);
990  nb->nb_tm = NULL;
991  rc = m0_net_buffer_add(nb, tm);
992  M0_UT_ASSERT(rc == 0);
994  M0_UT_ASSERT(nb->nb_tm == tm);
995  num_adds[nb->nb_qtype]++;
997  max_bytes[nb->nb_qtype]);
998  }
1000 
1001  /* Issue multiple fake buffer "post" with the RETAIN flag. */
1002  for (reuse_cnt = 0; reuse_cnt < NUM_REUSES; ++reuse_cnt) {
1003  bool retain = true;
1004  if (reuse_cnt == NUM_REUSES - 1)
1005  retain = false;
1006  for (i = M0_NET_QT_MSG_RECV; i < M0_NET_QT_NR; ++i) {
1007  m0_time_t to_before = 0;
1008  struct m0_net_buffer_event ev = {
1009  .nbe_buffer = &nbs[i],
1010  .nbe_status = 0,
1011  };
1012  DELAY_MS(1);
1013  ev.nbe_time = m0_time_now();
1014  nb = &nbs[i];
1015 
1016  if (i == M0_NET_QT_MSG_RECV) {
1017  /* simulate transport ep in recv msg */
1018  ev.nbe_ep = ep2;
1019  m0_net_end_point_get(ep2);
1020  }
1021 
1022  if (i == M0_NET_QT_MSG_RECV ||
1025  /* fake the length in the event */
1026  ev.nbe_length = buf_size;
1027  nb->nb_length = 0; /* the tests expect this */
1028  }
1029 
1030  nb->nb_flags |= M0_NET_BUF_IN_USE;
1031  if (retain) {
1032  nb->nb_flags |= M0_NET_BUF_RETAIN;
1033  to_before = nb->nb_timeout;
1035  if (i == M0_NET_QT_MSG_SEND)
1036  m0_net_end_point_get(ep2); /* adjust */
1037  } else {
1039  }
1043  if (retain) {
1044  M0_UT_ASSERT(to_before == nb->nb_timeout);
1047  } else {
1049  M0_UT_ASSERT(!(nb->nb_flags &
1051  M0_UT_ASSERT(!(nb->nb_flags &
1053  }
1054  if (i == M0_NET_QT_MSG_RECV)
1056  reuse_cnt + 1);
1057  }
1058  }
1059  /* free end point */
1061  m0_net_end_point_put(ep2);
1062 
1063  /* TM stop */
1064  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
1065  rc = m0_net_tm_stop(tm, false);
1066  M0_UT_ASSERT(rc == 0);
1068 
1069  /* wait on channel for stopped */
1070  m0_chan_wait(&tmwait);
1071  m0_clink_del_lock(&tmwait);
1074 
1075  /* clean up; real xprt would handle this itself */
1078 
1079  /* de-register channel waiter */
1080  m0_clink_fini(&tmwait);
1081 
1082  /* get stats (specific queue, then all queues) */
1084  rc = m0_net_tm_stats_get(tm, i, &qs[0], false);
1085  M0_UT_ASSERT(rc == 0);
1086  M0_UT_ASSERT(qs[0].nqs_num_adds == num_adds[i]);
1087  M0_UT_ASSERT(qs[0].nqs_num_dels == num_dels[i]);
1088  M0_UT_ASSERT(qs[0].nqs_total_bytes == total_bytes[i]);
1089  M0_UT_ASSERT(qs[0].nqs_max_bytes == max_bytes[i]);
1090  M0_UT_ASSERT((qs[0].nqs_num_f_events + qs[0].nqs_num_s_events)
1091  == num_adds[i] + reuse_cnt - 1);
1092  M0_UT_ASSERT(qs[0].nqs_num_f_events + qs[0].nqs_num_s_events > 0 &&
1093  qs[0].nqs_time_in_queue > 0);
1094 
1095  rc = m0_net_tm_stats_get(tm, M0_NET_QT_NR, qs, true);
1096  M0_UT_ASSERT(rc == 0);
1097  for (i = 0; i < M0_NET_QT_NR; i++) {
1098  KPRN("i=%d\n", i);
1099 #define QS(x) KPRN("\t" #x "=%" PRId64 "\n", qs[i].nqs_##x)
1100 #define QS2(x) KPRN("\t" #x "=%" PRId64 " [%" PRId64 "]\n", qs[i].nqs_##x, x[i])
1101  QS2(num_adds);
1102  QS2(num_dels);
1103  QS2(total_bytes);
1104  QS(max_bytes);
1105  QS(num_f_events);
1106  QS(num_s_events);
1107  QS(time_in_queue);
1108  M0_UT_ASSERT(qs[i].nqs_num_adds == num_adds[i]);
1109  M0_UT_ASSERT(qs[i].nqs_num_dels == num_dels[i]);
1110  M0_UT_ASSERT(qs[i].nqs_total_bytes == total_bytes[i]);
1111  M0_UT_ASSERT(qs[i].nqs_total_bytes >= qs[i].nqs_max_bytes);
1112  M0_UT_ASSERT(qs[i].nqs_max_bytes == max_bytes[i]);
1113  M0_UT_ASSERT((qs[i].nqs_num_f_events + qs[i].nqs_num_s_events)
1114  == num_adds[i] + reuse_cnt - 1);
1115  M0_UT_ASSERT(qs[i].nqs_num_f_events +
1116  qs[i].nqs_num_s_events > 0 &&
1117  qs[i].nqs_time_in_queue > 0);
1118  }
1119 
1120  rc = m0_net_tm_stats_get(tm, M0_NET_QT_NR, qs, false);
1121  M0_UT_ASSERT(rc == 0);
1122  for (i = 0; i < M0_NET_QT_NR; i++) {
1123  M0_UT_ASSERT(qs[i].nqs_num_adds == 0);
1124  M0_UT_ASSERT(qs[i].nqs_num_dels == 0);
1125  M0_UT_ASSERT(qs[i].nqs_num_f_events == 0);
1126  M0_UT_ASSERT(qs[i].nqs_num_s_events == 0);
1127  M0_UT_ASSERT(qs[i].nqs_total_bytes == 0);
1128  M0_UT_ASSERT(qs[i].nqs_max_bytes == 0);
1129  M0_UT_ASSERT(qs[i].nqs_time_in_queue == 0);
1130  }
1131 
1132  /* fini the TM */
1133  ut_tm_fini_called = false;
1134  m0_net_tm_fini(tm);
1136 
1137  /* TM fini releases final end point */
1139 
1140  /*
1141  * Test APIs for synchronous buffer event delivery
1142  */
1143 
1144  /* restart the TM */
1145  ut_tm_init_called = false;
1146  ut_tm_fini_called = false;
1147  rc = m0_net_tm_init(tm, dom);
1148  M0_UT_ASSERT(rc == 0);
1153 
1154  /* request synchronous buffer event delivery */
1155  M0_UT_ASSERT(dom->nd_xprt->nx_ops->xo_bev_deliver_sync == NULL);
1157  M0_UT_ASSERT(rc == -ENOSYS); /* optional support */
1161  M0_UT_ASSERT(dom->nd_xprt->nx_ops->xo_bev_deliver_sync != NULL);
1164  M0_UT_ASSERT(rc == 0);
1167 
1168  /* start the TM */
1169  m0_clink_init(&tmwait, NULL);
1170  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
1171  rc = m0_net_tm_start(tm, "addr3");
1172  M0_UT_ASSERT(rc == 0);
1173  m0_chan_wait(&tmwait);
1174  m0_clink_del_lock(&tmwait);
1176  m0_thread_join(&ut_tm_thread); /* cleanup thread */
1178 
1179  /* test the synchronous buffer event delivery APIs */
1181  brc = m0_net_buffer_event_pending(tm);
1183  M0_UT_ASSERT(!brc);
1184 
1188 
1189  ut_bev_pending_called = false;
1190  brc = m0_net_buffer_event_pending(tm);
1192  M0_UT_ASSERT(brc);
1193 
1197 
1198  /* TM stop and fini */
1199  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
1200  rc = m0_net_tm_stop(tm, false);
1201  M0_UT_ASSERT(rc == 0);
1202  m0_chan_wait(&tmwait);
1203  m0_clink_del_lock(&tmwait);
1204  m0_thread_join(&ut_tm_thread); /* cleanup thread */
1206  m0_clink_fini(&tmwait);
1207  m0_net_tm_fini(tm);
1208 
1209  /* de-register and free buffers */
1210  for (i = 0; i < M0_NET_QT_NR; ++i) {
1211  nb = &nbs[i];
1212  ut_buf_deregister_called = false;
1213  m0_net_desc_free(&nb->nb_desc);
1217  m0_bufvec_free(&nb->nb_buffer);
1218  }
1219  m0_free(nbs);
1220 
1221  /* fini the domain */
1222  M0_UT_ASSERT(ut_dom_fini_called == false);
1225 }
1226 
1227 #include "net/ut/tm_provision_ut.c"
1228 
1230  .ts_name = "net-bulk-if",
1231  .ts_init = net_bulk_if_init,
1232  .ts_fini = NULL,
1233  .ts_tests = {
1234  { "net_bulk_if", test_net_bulk_if },
1235  { NULL, NULL }
1236  }
1237 };
1238 M0_EXPORTED(m0_net_bulk_if_ut);
1239 
1240 /*
1241  * Local variables:
1242  * c-indentation-style: "K&R"
1243  * c-basic-offset: 8
1244  * tab-width: 8
1245  * fill-column: 79
1246  * scroll-step: 1
1247  * End:
1248  */
static bool ut_bev_pending(struct m0_net_transfer_mc *tm)
Definition: bulk_if.c:409
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
const struct m0_net_xprt_ops * nx_ops
Definition: net.h:126
static uint64_t num_adds[M0_NET_QT_NR]
Definition: bulk_if.c:497
static struct m0_addb2_philter p
Definition: consumer.c:40
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
static bool ut_bev_pending_called
Definition: bulk_if.c:76
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static m0_bcount_t ut_get_max_buffer_segment_size(const struct m0_net_domain *dom)
Definition: bulk_if.c:151
void m0_net_domain_fini(struct m0_net_domain *dom)
Definition: domain.c:71
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: tm.c:261
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
char * addr
Definition: bulk_if.c:170
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
static const struct m0_bitmap * ut_tm_confine_bm
Definition: bulk_if.c:65
static m0_bcount_t ut_get_max_buffer_desc_size(const struct m0_net_domain *dom)
Definition: bulk_if.c:164
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
static void ut_multi_use_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:574
static bool ut_get_max_buffer_segments_called
Definition: bulk_if.c:61
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
uint32_t nbd_len
Definition: net_otw_types.h:37
static int32_t ut_get_max_buffer_segments(const struct m0_net_domain *dom)
Definition: bulk_if.c:158
static int ut_tm_confine(struct m0_net_transfer_mc *tm, const struct m0_bitmap *processors)
Definition: bulk_if.c:386
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
static void ut_passive_bulk_recv_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:554
#define KPRN(fmt,...)
Definition: bulk_if.c:43
static bool ut_end_point_create_called
Definition: bulk_if.c:62
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
static void ut_post_tm_started_ev_thread(struct m0_net_end_point *ep)
Definition: bulk_if.c:313
static void ut_active_bulk_recv_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:564
static m0_bcount_t ut_get_max_buffer_size(const struct m0_net_domain *dom)
Definition: bulk_if.c:145
static bool ut_multi_use_expect_queued
Definition: bulk_if.c:78
M0_INTERNAL int m0_net_tm_stats_get(struct m0_net_transfer_mc *tm, enum m0_net_queue_type qtype, struct m0_net_qstats *qs, bool reset)
Definition: tm.c:343
uint8_t * nbd_data
Definition: net_otw_types.h:38
struct m0_vec ov_vec
Definition: vec.h:147
enum m0_net_tm_state ntm_state
Definition: net.h:819
static void ut_tm_fini(struct m0_net_transfer_mc *tm)
Definition: bulk_if.c:301
struct m0_thread ut_del_thread
Definition: bulk_if.c:272
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
struct m0_mutex nd_mutex
Definition: net.h:381
static bool ut_tm_fini_called
Definition: bulk_if.c:71
static void ut_end_point_release(struct m0_ref *ref)
Definition: bulk_if.c:175
struct m0_net_domain * ntm_dom
Definition: net.h:853
static void ut_active_bulk_send_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:569
struct m0_net_buffer_callbacks ut_buf_multi_use_cb
Definition: bulk_if.c:607
static bool ut_buf_register_called
Definition: bulk_if.c:66
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:261
struct m0_mutex m0_net_mutex
Definition: net.c:59
m0_time_t nbe_time
Definition: net.h:1197
m0_bcount_t nb_min_receive_size
Definition: net.h:1496
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
Definition: ut.h:77
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
#define UT_CB_CALL(_qt)
Definition: bulk_if.c:543
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
bool ntm_bev_auto_deliver
Definition: net.h:891
static int ut_cb_calls[M0_NET_QT_NR]
Definition: bulk_if.c:496
struct m0_tl ntm_end_points
Definition: net.h:856
#define m0_tl_endfor
Definition: tlist.h:700
static bool ut_tm_stop_called
Definition: bulk_if.c:73
M0_INTERNAL int m0_bufvec_alloc(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size)
Definition: vec.c:220
static struct @424 ut_xprt_pvt
Definition: sock.c:754
struct m0_chan ntm_chan
Definition: net.h:874
uint32_t nb_msgs_received
Definition: net.h:1511
static void make_desc(struct m0_net_buf_desc *desc)
Definition: bulk_if.c:486
M0_INTERNAL void m0_bufvec_free(struct m0_bufvec *bufvec)
Definition: vec.c:395
int i
Definition: dir.c:1033
static int net_bulk_if_init(void)
Definition: bulk_if.c:83
struct m0_net_end_point uep
Definition: bulk_if.c:171
int32_t nbe_status
Definition: net.h:1218
static void ut_post_state_change_ev_thread(int n)
Definition: bulk_if.c:329
static void ut_buf_del(struct m0_net_buffer *nb)
Definition: bulk_if.c:273
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
void * ntm_xprt_private
Definition: net.h:886
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
Definition: tm.c:160
static void ut_msg_send_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:549
static bool ut_bev_notify_called
Definition: bulk_if.c:80
Definition: refs.h:34
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
uint32_t nb_max_receive_msgs
Definition: net.h:1502
static int ut_tm_init(struct m0_net_transfer_mc *tm)
Definition: bulk_if.c:290
static int ut_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: bulk_if.c:191
#define M0_ASSERT(cond)
struct m0_ut_suite m0_net_bulk_if_ut
Definition: bulk_if.c:1229
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
Definition: tm.c:204
static struct m0_net_xprt_ops ut_xprt_ops
Definition: bulk_if.c:425
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
const struct m0_net_xprt * nd_xprt
Definition: net.h:396
static bool ut_end_point_release_called
Definition: bulk_if.c:63
static bool ut_buf_deregister_called
Definition: bulk_if.c:67
static struct m0_net_buffer * allocate_buffers(m0_bcount_t buf_size, m0_bcount_t buf_seg_size, int32_t buf_segs)
Definition: bulk_if.c:455
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
m0_time_t m0_time_now(void)
Definition: time.c:134
static m0_bcount_t max_bytes[M0_NET_QT_NR]
Definition: bulk_if.c:500
struct m0_atomic64 ref_cnt
Definition: refs.h:38
static bool ut_dom_init_called
Definition: bulk_if.c:57
static int ut_tm_stop(struct m0_net_transfer_mc *tm, bool cancel)
Definition: bulk_if.c:371
m0_net_tm_state
Definition: net.h:630
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
static struct m0_stob_domain * dom
Definition: storage.c:38
static bool ut_bev_deliver_all_called
Definition: bulk_if.c:75
static bool ut_dom_fini_called
Definition: bulk_if.c:58
static bool ut_tm_init_called
Definition: bulk_if.c:70
static bool ut_buf_add_called
Definition: bulk_if.c:69
static int ut_dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: bulk_if.c:114
#define QS(x)
struct m0_list_link ntm_dom_linkage
Definition: net.h:883
static bool ut_bev_deliver_sync_called
Definition: bulk_if.c:74
struct m0_net_domain * nb_dom
Definition: net.h:1351
static bool ut_tm_confine_called
Definition: bulk_if.c:64
void * m0_alloc(size_t size)
Definition: memory.c:126
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
const struct m0_net_buffer_callbacks * nb_callbacks
Definition: net.h:1369
M0_INTERNAL bool m0_list_contains(const struct m0_list *list, const struct m0_list_link *link)
Definition: list.c:87
static bool ut_buf_del_called
Definition: bulk_if.c:68
Definition: xcode.h:73
static int ut_tm_event_cb_calls
Definition: bulk_if.c:81
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
static struct m0_thread ut_tm_thread
Definition: bulk_if.c:312
uint32_t v_nr
Definition: vec.h:51
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_size(struct m0_net_domain *dom)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
Definition: chan.h:229
struct m0_net_transfer_mc * nep_tm
Definition: net.h:493
#define DELAY_MS(ms)
Definition: bulk_if.c:46
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
Definition: net.c:74
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
static struct m0_net_buffer * ut_multi_use_got_buf
Definition: bulk_if.c:79
static struct m0_net_transfer_mc ut_tm
Definition: bulk_if.c:35
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
Definition: tm.c:293
static void ut_bev_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
Definition: bulk_if.c:417
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
M0_INTERNAL void m0_net_buffer_event_deliver_all(struct m0_net_transfer_mc *tm)
Definition: tm.c:397
int(* xo_tm_confine)(struct m0_net_transfer_mc *tm, const struct m0_bitmap *processors)
Definition: net.h:170
struct m0_ref nep_ref
Definition: net.h:491
static int ut_buf_add(struct m0_net_buffer *nb)
Definition: bulk_if.c:240
static int ut_bev_pending_last
Definition: bulk_if.c:77
static void ut_post_del_thread(struct m0_net_buffer *nb)
Definition: bulk_if.c:257
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
M0_INTERNAL void m0_net_buffer_event_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
Definition: tm.c:423
const char * ts_name
Definition: ut.h:99
int(* xo_end_point_create)(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: net.h:230
static void ut_msg_recv_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:544
static int ut_buf_register(struct m0_net_buffer *nb)
Definition: bulk_if.c:226
uint64_t n
Definition: fops.h:107
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
static bool ut_tm_start_called
Definition: bulk_if.c:72
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
Definition: domain.c:36
char * ep
Definition: sw.h:132
static bool ut_get_max_buffer_size_called
Definition: bulk_if.c:60
#define QS2(x)
M0_INTERNAL bool m0_net_buffer_event_pending(struct m0_net_transfer_mc *tm)
Definition: tm.c:409
static struct m0_chan chan[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_net_buffer_event_deliver_synchronously(struct m0_net_transfer_mc *tm)
Definition: tm.c:377
Definition: queue.c:27
struct m0_net_end_point * ntm_ep
Definition: net.h:868
void ut_tm_event_cb(const struct m0_net_tm_event *ev)
Definition: bulk_if.c:586
m0_net_queue_type
Definition: net.h:591
static uint64_t num_dels[M0_NET_QT_NR]
Definition: bulk_if.c:498
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
static void ut_bev_deliver_all(struct m0_net_transfer_mc *tm)
Definition: bulk_if.c:402
static struct m0_net_end_point * ut_last_ep_released
Definition: bulk_if.c:174
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
M0_INTERNAL int m0_net_tm_confine(struct m0_net_transfer_mc *tm, const struct m0_bitmap *processors)
Definition: tm.c:356
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static m0_bcount_t total_bytes[M0_NET_QT_NR]
Definition: bulk_if.c:499
struct m0_net_transfer_mc * tm
Definition: bulk_if.c:288
static void ut_dom_fini(struct m0_net_domain *dom)
Definition: bulk_if.c:130
const char * nx_name
Definition: net.h:125
static bool ut_get_max_buffer_segment_size_called
Definition: bulk_if.c:59
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
static void ut_buffer_event_callback(const struct m0_net_buffer_event *ev, enum m0_net_queue_type qt, bool queue_check)
Definition: bulk_if.c:502
struct m0_net_tm_callbacks ut_tm_cb
Definition: bulk_if.c:592
int(* xo_dom_init)(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: net.h:139
Definition: bulk_if.c:169
static void ut_passive_bulk_send_cb(const struct m0_net_buffer_event *ev)
Definition: bulk_if.c:559
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
static uint32_t buf_size
Definition: ad.c:75
enum m0_net_tm_ev_type nte_type
Definition: net.h:691
static struct m0_net_xprt ut_xprt
Definition: bulk_if.c:448
m0_time_t nte_time
Definition: net.h:701
struct m0_net_buffer_callbacks ut_buf_cb
Definition: bulk_if.c:596
struct m0_net_xprt * xprt
Definition: module.c:61
static void test_net_bulk_if(void)
Definition: bulk_if.c:626
int num
Definition: bulk_if.c:54
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
int(* xo_bev_deliver_sync)(struct m0_net_transfer_mc *tm)
Definition: net.h:301
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: ep.c:56
static int ut_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: bulk_if.c:344
static int ut_bev_deliver_sync(struct m0_net_transfer_mc *tm)
Definition: bulk_if.c:395
static struct m0_net_domain utdom
Definition: bulk_if.c:34
const struct m0_net_tm_callbacks * ntm_callbacks
Definition: net.h:816
#define M0_UT_ASSERT(a)
Definition: ut.h:46
static uint64_t max64u(uint64_t a, uint64_t b)
Definition: arith.h:71
struct m0_net_end_point * nb_ep
Definition: net.h:1424
static struct m0_net_transfer_mc * ut_evt_tm
Definition: bulk_if.c:36
static void ut_buf_deregister(struct m0_net_buffer *nb)
Definition: bulk_if.c:233