Motr  M0
mem_xprt_xo.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET
24 #include "lib/trace.h"
25 #include "lib/memory.h"
26 #include "lib/misc.h"
27 #include "lib/string.h" /* m0_streq */
29 
39 static struct m0_list mem_domains;
40 
41 /* forward reference */
43 
47 M0_INTERNAL int m0_mem_xprt_init(void)
48 {
51  if (m0_streq(M0_DEFAULT_NETWORK, "MEMBULK"))
53  return 0;
54 }
55 
59 M0_INTERNAL void m0_mem_xprt_fini(void)
60 {
63 }
64 
65 /* To reduce global symbols, yet make the code readable, we
66  include other .c files with static symbols into this file.
67  Dependency information must be captured in Makefile.am.
68 
69  Static functions should be declared in the private header file
70  so that the order of their definition does not matter.
71  */
76 
78 {
79  return m0_vec_count(&nb->nb_buffer.ov_vec);
80 }
81 
85 static bool mem_buffer_in_bounds(const struct m0_net_buffer *nb)
86 {
87  const struct m0_vec *v = &nb->nb_buffer.ov_vec;
88  uint32_t i;
89  m0_bcount_t len = 0;
90 
92  return false;
93  for (i = 0; i < v->v_nr; ++i) {
95  return false;
96  M0_ASSERT(len + v->v_count[i] >= len);
97  len += v->v_count[i];
98  }
99  return len <= M0_NET_BULK_MEM_MAX_BUFFER_SIZE;
100 }
101 
113 static int mem_copy_buffer(struct m0_net_buffer *d_nb,
114  struct m0_net_buffer *s_nb,
115  m0_bcount_t num_bytes)
116 {
117  struct m0_bufvec_cursor s_cur;
118  struct m0_bufvec_cursor d_cur;
119  m0_bcount_t bytes_copied;
120 
121  if (mem_buffer_length(d_nb) < num_bytes) {
122  return M0_ERR(-EFBIG);
123  }
124  M0_PRE(mem_buffer_length(s_nb) >= num_bytes);
125 
127  m0_bufvec_cursor_init(&d_cur, &d_nb->nb_buffer);
128  bytes_copied = m0_bufvec_cursor_copy(&d_cur, &s_cur, num_bytes);
129  M0_ASSERT(bytes_copied == num_bytes);
130 
131  return 0;
132 }
133 
137 static void mem_wi_add(struct m0_net_bulk_mem_work_item *wi,
138  struct m0_net_bulk_mem_tm_pvt *tp)
139 {
142 }
143 
148 {
149  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
150  struct m0_net_buffer_event ev = {
151  .nbe_buffer = nb,
152  .nbe_status = wi->xwi_status,
153  .nbe_offset = 0,
154  .nbe_length = wi->xwi_nbe_length,
155  .nbe_ep = wi->xwi_nbe_ep
156  };
157  M0_PRE(wi->xwi_status <= 0);
158  ev.nbe_time = m0_time_now();
160  return;
161 }
162 
163 static bool mem_dom_invariant(const struct m0_net_domain *dom)
164 {
166  return dp != NULL && dp->xd_dom == dom;
167 }
168 
169 static bool mem_ep_invariant(const struct m0_net_end_point *ep)
170 {
171  const struct m0_net_bulk_mem_end_point *mep = mem_ep_to_pvt(ep);
172  return mep->xep_magic == M0_NET_BULK_MEM_XEP_MAGIC &&
173  mep->xep_ep.nep_addr == &mep->xep_addr[0];
174 }
175 
176 static bool mem_buffer_invariant(const struct m0_net_buffer *nb)
177 {
178  const struct m0_net_bulk_mem_buffer_pvt *bp = mem_buffer_to_pvt(nb);
179  return bp != NULL && bp->xb_buffer == nb &&
181 }
182 
183 static bool mem_tm_invariant(const struct m0_net_transfer_mc *tm)
184 {
185  const struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
186  return tp != NULL && tp->xtm_tm == tm &&
188 }
189 
200 static int mem_xo_dom_init(const struct m0_net_xprt *xprt,
201  struct m0_net_domain *dom)
202 {
203  struct m0_net_bulk_mem_domain_pvt *dp;
204 
205  M0_ENTRY();
206 
207  if (dom->nd_xprt_private != NULL) {
209  dp = dom->nd_xprt_private;
210  } else {
211  M0_ALLOC_PTR(dp);
212  if (dp == NULL) {
213  return M0_RC(-ENOMEM);
214  }
215  dom->nd_xprt_private = dp;
216  }
217  M0_ASSERT(mem_dom_to_pvt(dom) == dp);
218  dp->xd_dom = dom;
219  dp->xd_ops = &mem_xprt_methods;
220 
221  /* tunable parameters */
222  dp->xd_addr_tuples = 2;
223  dp->xd_num_tm_threads = 1;
224 
225  dp->xd_buf_id_counter = 0;
227 
228  if (xprt != &m0_net_bulk_mem_xprt) {
229  dp->xd_derived = true;
230  } else {
231  dp->xd_derived = false;
233  }
235 
236  return M0_RC(0);
237 }
238 
245 static void mem_xo_dom_fini(struct m0_net_domain *dom)
246 {
249 
250  if (dp->xd_derived)
251  return;
253  m0_free(dp);
254  dom->nd_xprt_private = NULL;
255 }
256 
258 {
261 }
262 
264  const struct m0_net_domain *dom)
265 {
268 }
269 
270 static int32_t mem_xo_get_max_buffer_segments(const struct m0_net_domain *dom)
271 {
274 }
275 
291  struct m0_net_transfer_mc *tm,
292  const char *addr)
293 {
295  const char *dot_ip;
296  char *p;
297  char *pp;
298  int pnum;
299  struct sockaddr_in sa;
300  uint32_t id = 0;
302 
303  M0_PRE(M0_IN(dp->xd_addr_tuples, (2, 3)));
304 
305  if (addr == NULL)
306  return M0_ERR(-ENOSYS); /* no dynamic addressing */
307 
308  strncpy(buf, addr, sizeof(buf)-1); /* copy to modify */
309  buf[sizeof(buf)-1] = '\0';
310  for (p=buf; *p && *p != ':'; p++);
311  if (*p == '\0')
312  return M0_ERR(-EINVAL);
313  *p++ = '\0'; /* terminate the ip address : */
314  pp = p;
315  for (p=pp; *p && *p != ':'; p++);
316  if (dp->xd_addr_tuples == 3) {
317  *p++ = '\0'; /* terminate the port number */
318  sscanf(p, "%u", &id);
319  if (id == 0)
320  return M0_ERR(-EINVAL);
321  }
322  else if (*p == ':')
323  return M0_ERR(-EINVAL); /* 3-tuple where expecting 2 */
324  sscanf(pp, "%d", &pnum);
325  sa.sin_port = htons(pnum);
326  dot_ip = buf;
327 #ifdef __KERNEL__
328  sa.sin_addr.s_addr = in_aton(dot_ip);
329  if (sa.sin_addr.s_addr == 0)
330  return M0_ERR(-EINVAL);
331 #else
332  if (inet_aton(dot_ip, &sa.sin_addr) == 0)
333  return M0_ERR(-EINVAL);
334 #endif
335  return mem_bmo_ep_create(epp, tm, &sa, id);
336 }
337 
346 static int mem_xo_buf_register(struct m0_net_buffer *nb)
347 {
348  struct m0_net_bulk_mem_domain_pvt *dp;
350 
351  M0_PRE(nb->nb_dom != NULL && mem_dom_invariant(nb->nb_dom));
352 
353  if (!mem_bmo_buffer_in_bounds(nb))
354  return M0_ERR(-EFBIG);
355 
356  dp = mem_dom_to_pvt(nb->nb_dom);
357  if (dp->xd_derived) {
358  M0_PRE(nb->nb_xprt_private != NULL);
359  bp = nb->nb_xprt_private;
360  } else {
361  M0_PRE(nb->nb_xprt_private == NULL);
362  M0_ALLOC_PTR(bp);
363  if (bp == NULL)
364  return M0_ERR(-ENOMEM);
365  nb->nb_xprt_private = bp;
366  }
368 
369  bp->xb_buffer = nb;
370  m0_list_link_init(&bp->xb_wi.xwi_link);
371  bp->xb_wi.xwi_op = M0_NET_XOP_NR;
373  return 0;
374 }
375 
382 static void mem_xo_buf_deregister(struct m0_net_buffer *nb)
383 {
384  struct m0_net_bulk_mem_domain_pvt *dp;
386 
388  dp = mem_dom_to_pvt(nb->nb_dom);
389  bp = mem_buffer_to_pvt(nb);
390  m0_list_link_fini(&bp->xb_wi.xwi_link);
391  if (!dp->xd_derived) {
392  m0_free(bp);
393  nb->nb_xprt_private = NULL;
394  }
395  return;
396 }
397 
401 static int mem_xo_buf_add(struct m0_net_buffer *nb)
402 {
403  struct m0_net_transfer_mc *tm;
404  struct m0_net_bulk_mem_tm_pvt *tp;
405  struct m0_net_bulk_mem_domain_pvt *dp;
407  struct m0_net_bulk_mem_work_item *wi;
408  int rc;
409 
412  (nb->nb_flags & M0_NET_BUF_IN_USE) == 0);
413 
414  M0_PRE(nb->nb_offset == 0); /* don't support any other value */
415 
416  tm = nb->nb_tm;
418  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
419  tp = mem_tm_to_pvt(tm);
420 
421  if (tp->xtm_state > M0_NET_XTM_STARTED)
422  return M0_ERR(-EPERM);
423 
424  dp = mem_dom_to_pvt(tm->ntm_dom);
425  bp = mem_buffer_to_pvt(nb);
426  wi = &bp->xb_wi;
427  wi->xwi_op = M0_NET_XOP_NR;
428 
429  switch (nb->nb_qtype) {
430  case M0_NET_QT_MSG_RECV:
431  break;
432  case M0_NET_QT_MSG_SEND:
433  M0_ASSERT(nb->nb_ep != NULL);
435  break;
437  nb->nb_length = 0;
438  /* fallthrough */
440  bp->xb_buf_id = ++dp->xd_buf_id_counter;
441  rc = mem_bmo_desc_create(&nb->nb_desc, tm,
442  nb->nb_qtype, nb->nb_length,
443  bp->xb_buf_id);
444  if (rc != 0)
445  return M0_RC(rc);
446  break;
448  /* fallthrough */
451  break;
452  default:
453  M0_IMPOSSIBLE("invalid queue type");
454  break;
455  }
456  wi->xwi_status = -1;
457 
458  if (wi->xwi_op != M0_NET_XOP_NR) {
459  mem_wi_add(wi, tp);
460  }
461 
462  return 0;
463 }
464 
470 static void mem_xo_buf_del(struct m0_net_buffer *nb)
471 {
473  struct m0_net_transfer_mc *tm;
474  struct m0_net_bulk_mem_tm_pvt *tp;
475  struct m0_net_bulk_mem_work_item *wi;
476 
479 
480  tm = nb->nb_tm;
482  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
483  tp = mem_tm_to_pvt(tm);
484 
485  if (nb->nb_flags & M0_NET_BUF_IN_USE)
486  return;
487 
488  wi = &bp->xb_wi;
490  if (!(nb->nb_flags & M0_NET_BUF_TIMED_OUT))
492 
493  switch (nb->nb_qtype) {
494  case M0_NET_QT_MSG_RECV:
497  /* must be added to the work list */
499  mem_wi_add(wi, tp);
500  break;
501 
502  case M0_NET_QT_MSG_SEND:
505  /* these are already queued */
507  break;
508 
509  default:
510  M0_IMPOSSIBLE("invalid queue type");
511  break;
512  }
513 }
514 
526 static int mem_xo_tm_init(struct m0_net_transfer_mc *tm)
527 {
528  struct m0_net_bulk_mem_domain_pvt *dp;
529  struct m0_net_bulk_mem_tm_pvt *tp;
530 
532 
533  dp = mem_dom_to_pvt(tm->ntm_dom);
534  if (dp->xd_derived) {
535  M0_PRE(tm->ntm_xprt_private != NULL);
536  tp = tm->ntm_xprt_private;
537  } else {
538  M0_PRE(tm->ntm_xprt_private == NULL);
539  M0_ALLOC_PTR(tp);
540  if (tp == NULL)
541  return M0_ERR(-ENOMEM);
542  tm->ntm_xprt_private = tp;
543  }
544  M0_ASSERT(mem_tm_to_pvt(tm) == tp);
545 
547  /* defer allocation of thread array to start time */
548  tp->xtm_tm = tm;
551  m0_cond_init(&tp->xtm_work_list_cv, &tm->ntm_mutex);
553  return 0;
554 }
555 
562 static void mem_xo_tm_fini(struct m0_net_transfer_mc *tm)
563 {
564  struct m0_net_bulk_mem_domain_pvt *dp;
565  struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
566 
569  tp->xtm_state == M0_NET_XTM_FAILED ||
571 
572  dp = mem_dom_to_pvt(tm->ntm_dom);
573  m0_mutex_lock(&tm->ntm_mutex);
574  tp->xtm_state = M0_NET_XTM_STOPPED; /* to stop the workers */
576  m0_mutex_unlock(&tm->ntm_mutex);
577  if (tp->xtm_worker_threads != NULL) {
578  int i;
579  for (i = 0; i < tp->xtm_num_workers; ++i) {
582  }
584  }
587  tp->xtm_tm = NULL;
588  if (!dp->xd_derived) {
589  tm->ntm_xprt_private = NULL;
590  m0_free(tp);
591  }
592 }
593 
595  *tm, size_t num)
596 {
597  struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
599  m0_mutex_lock(&tm->ntm_mutex);
602  tp->xtm_num_workers = num;
603  m0_mutex_unlock(&tm->ntm_mutex);
604 }
605 
606 M0_INTERNAL size_t m0_net_bulk_mem_tm_get_num_threads(const struct
607  m0_net_transfer_mc *tm)
608 {
609  struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
611  return tp->xtm_num_workers;
612 }
613 
614 static int mem_xo_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
615 {
616  struct m0_net_bulk_mem_tm_pvt *tp;
617  struct m0_net_bulk_mem_work_item *wi_st_chg;
618  const struct m0_net_xprt *xprt;
619  int rc = 0;
620  int i;
621 
623  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
624 
625  tp = mem_tm_to_pvt(tm);
626  if (tp->xtm_state == M0_NET_XTM_STARTED)
627  return 0;
628  if (tp->xtm_state == M0_NET_XTM_STARTING)
629  return 0;
631  return M0_ERR(-EPERM);
632 
633  /* allocate worker thread array */
634  if (tp->xtm_worker_threads == NULL) {
635  /* allocation creates parked threads in case of failure */
636  M0_CASSERT(TS_PARKED == 0);
638  if (tp->xtm_worker_threads == NULL)
639  return M0_ERR(-ENOMEM);
640  }
641 
642  /* allocate a state change work item */
643  M0_ALLOC_PTR(wi_st_chg);
644  if (wi_st_chg == NULL)
645  return M0_ERR(-ENOMEM);
646  m0_list_link_init(&wi_st_chg->xwi_link);
647  wi_st_chg->xwi_op = M0_NET_XOP_STATE_CHANGE;
648  wi_st_chg->xwi_next_state = M0_NET_XTM_STARTED;
649  wi_st_chg->xwi_status = 0;
650 
651  /* create the end point (indirectly via the transport ops vector) */
652  xprt = tm->ntm_dom->nd_xprt;
653  rc = (*xprt->nx_ops->xo_end_point_create)(&wi_st_chg->xwi_nbe_ep,
654  tm, addr);
655  if (rc != 0) {
656  m0_free(wi_st_chg);
657  return M0_RC(rc);
658  }
659 
660  /* start worker threads */
661  for (i = 0; i < tp->xtm_num_workers && rc == 0; ++i)
663  struct m0_net_transfer_mc *, NULL,
664  &mem_xo_tm_worker, tm,
665  "mem_tm_worker%d", i);
666 
667  if (rc == 0) {
668  /* set transition state and add the state change work item */
670  mem_wi_add(wi_st_chg, tp);
671  } else {
673  m0_list_link_fini(&wi_st_chg->xwi_link);
674  m0_free(wi_st_chg); /* fini cleans up threads */
675  }
676 
677  return M0_RC(rc);
678 }
679 
680 static int mem_xo_tm_stop(struct m0_net_transfer_mc *tm, bool cancel)
681 {
682  struct m0_net_bulk_mem_tm_pvt *tp;
683  struct m0_net_bulk_mem_work_item *wi_st_chg;
684 
686  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
687 
688  tp = mem_tm_to_pvt(tm);
689  if (tp->xtm_state >= M0_NET_XTM_STOPPING)
690  return 0;
691  /* allocate a state change work item */
692  M0_ALLOC_PTR(wi_st_chg);
693  if (wi_st_chg == NULL)
694  return M0_ERR(-ENOMEM);
695  m0_list_link_init(&wi_st_chg->xwi_link);
696  wi_st_chg->xwi_op = M0_NET_XOP_STATE_CHANGE;
697  wi_st_chg->xwi_next_state = M0_NET_XTM_STOPPED;
698  if (cancel)
699  m0_net__tm_cancel(tm);
700  /* set transition state and add the state change work item */
702  mem_wi_add(wi_st_chg, tp);
703 
704  return 0;
705 }
706 
708 {
710 
711  return sizeof(struct mem_desc);
712 }
713 
714 /* Internal methods of this transport; visible to derived transports. */
715 static const struct m0_net_bulk_mem_ops mem_xprt_methods = {
716  .bmo_work_fn = {
724  },
725  .bmo_ep_create = mem_ep_create,
726  .bmo_ep_alloc = mem_ep_alloc,
727  .bmo_ep_free = mem_ep_free,
728  .bmo_ep_release = mem_xo_end_point_release,
729  .bmo_ep_get = mem_ep_get,
730  .bmo_wi_add = mem_wi_add,
731  .bmo_buffer_in_bounds = mem_buffer_in_bounds,
732  .bmo_desc_create = mem_desc_create,
733  .bmo_post_error = mem_post_error,
734  .bmo_wi_post_buffer_event = mem_wi_post_buffer_event,
735 };
736 
737 /* External interface */
738 static const struct m0_net_xprt_ops mem_xo_xprt_ops = {
740  .xo_dom_fini = mem_xo_dom_fini,
741  .xo_get_max_buffer_size = mem_xo_get_max_buffer_size,
742  .xo_get_max_buffer_segment_size = mem_xo_get_max_buffer_segment_size,
743  .xo_get_max_buffer_segments = mem_xo_get_max_buffer_segments,
744  .xo_end_point_create = mem_xo_end_point_create,
745  .xo_buf_register = mem_xo_buf_register,
746  .xo_buf_deregister = mem_xo_buf_deregister,
747  .xo_buf_add = mem_xo_buf_add,
748  .xo_buf_del = mem_xo_buf_del,
749  .xo_tm_init = mem_xo_tm_init,
750  .xo_tm_fini = mem_xo_tm_fini,
751  .xo_tm_start = mem_xo_tm_start,
752  .xo_tm_stop = mem_xo_tm_stop,
753  .xo_get_max_buffer_desc_size = mem_xo_get_max_buffer_desc_size,
754 
755  .xo_rpc_max_seg_size = default_xo_rpc_max_seg_size,
756  .xo_rpc_max_segs_nr = default_xo_rpc_max_segs_nr,
757  .xo_rpc_max_msg_size = default_xo_rpc_max_msg_size,
758  .xo_rpc_max_recv_msgs = default_xo_rpc_max_recv_msgs,
759 };
760 
762  .nx_name = "bulk-mem",
763  .nx_ops = &mem_xo_xprt_ops
764 };
765 
766 #undef M0_TRACE_SUBSYSTEM
767  /* bulkmem */
769 
770 /*
771  * Local variables:
772  * c-indentation-style: "K&R"
773  * c-basic-offset: 8
774  * tab-width: 8
775  * fill-column: 80
776  * scroll-step: 1
777  * End:
778  */
struct m0_list_link xwi_link
Definition: mem_xprt.h:158
static bool mem_dom_invariant(const struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:163
const struct m0_net_xprt_ops * nx_ops
Definition: net.h:126
M0_INTERNAL void m0_list_link_fini(struct m0_list_link *link)
Definition: list.c:176
static struct m0_net_bulk_mem_end_point * mem_ep_alloc(void)
Definition: mem_xprt_ep.c:86
static struct m0_addb2_philter p
Definition: consumer.c:40
static void mem_wf_msg_send(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_msg.c:166
static void mem_wf_msg_recv_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_msg.c:30
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
struct m0_cond xtm_work_list_cv
Definition: mem_xprt.h:218
M0_INTERNAL void m0_list_add(struct m0_list *head, struct m0_list_link *new)
Definition: list.c:113
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static int mem_xo_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: mem_xprt_xo.c:614
enum m0_net_bulk_mem_tm_state xtm_state
Definition: mem_xprt.h:214
static m0_bcount_t mem_xo_get_max_buffer_desc_size(const struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:707
static bool mem_ep_invariant(const struct m0_net_end_point *ep)
Definition: mem_xprt_xo.c:169
M0_INTERNAL void m0_net_bulk_mem_tm_set_num_threads(struct m0_net_transfer_mc *tm, size_t num)
Definition: mem_xprt_xo.c:594
static struct m0_net_buffer * mem_wi_to_buffer(struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt.h:386
#define NULL
Definition: misc.h:38
m0_bindex_t nb_offset
Definition: net.h:1344
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL void m0_net__tm_cancel(struct m0_net_transfer_mc *tm)
Definition: tm.c:145
static bool mem_bmo_buffer_in_bounds(const struct m0_net_buffer *nb)
Definition: mem_xprt_pvt.h:110
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
M0_INTERNAL void m0_list_init(struct m0_list *head)
Definition: list.c:29
static void mem_wi_post_buffer_event(struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_xo.c:147
struct m0_list_link xd_dom_linkage
Definition: mem_xprt.h:355
#define M0_CASSERT(cond)
static void mem_wi_add(struct m0_net_bulk_mem_work_item *wi, struct m0_net_bulk_mem_tm_pvt *tp)
Definition: mem_xprt_xo.c:137
static struct m0_list mem_domains
Definition: mem_xprt_xo.c:39
struct m0_vec ov_vec
Definition: vec.h:147
enum m0_net_tm_state ntm_state
Definition: net.h:819
struct m0_thread * xtm_worker_threads
Definition: mem_xprt.h:222
static void mem_wf_cancel_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:107
static const struct m0_net_bulk_mem_ops mem_xprt_methods
Definition: mem_xprt_xo.c:42
m0_bcount_t nb_length
Definition: net.h:1334
M0_INTERNAL void m0_list_fini(struct m0_list *head)
Definition: list.c:36
enum m0_net_bulk_mem_work_opcode xwi_op
Definition: mem_xprt.h:163
uint64_t nb_flags
Definition: net.h:1489
Definition: vec.h:49
enum m0_net_bulk_mem_tm_state xwi_next_state
Definition: mem_xprt.h:166
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
Definition: list.c:147
struct m0_net_domain * ntm_dom
Definition: net.h:853
M0_INTERNAL void m0_net_xprt_default_set(const struct m0_net_xprt *xprt)
Definition: net.c:143
struct m0_net_buffer s_nb
Definition: cp.c:36
static void mem_wf_state_change(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:35
M0_INTERNAL int m0_mem_xprt_init(void)
Definition: mem_xprt_xo.c:47
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
static int mem_xo_buf_register(struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:346
m0_time_t nbe_time
Definition: net.h:1197
static int void * buf
Definition: dir.c:1019
struct m0_list xtm_work_list
Definition: mem_xprt.h:216
struct m0_net_end_point * xwi_nbe_ep
Definition: mem_xprt.h:179
const char * nep_addr
Definition: net.h:503
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
M0_INTERNAL void m0_cond_init(struct m0_cond *cond, struct m0_mutex *mutex)
Definition: cond.c:40
Definition: sock.c:887
static int mem_xo_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: mem_xprt_xo.c:290
M0_INTERNAL uint32_t default_xo_rpc_max_segs_nr(struct m0_net_domain *ndom)
Definition: net.c:247
static void mem_xo_buf_del(struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:470
static int mem_bmo_desc_create(struct m0_net_buf_desc *desc, struct m0_net_transfer_mc *tm, enum m0_net_queue_type qt, m0_bcount_t buflen, int64_t buf_id)
Definition: mem_xprt_pvt.h:121
return M0_RC(rc)
Definition: sock.c:754
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_net_bulk_mem_end_point * mem_ep_to_pvt(const struct m0_net_end_point *ep)
Definition: mem_xprt.h:262
int i
Definition: dir.c:1033
static void mem_post_error(struct m0_net_transfer_mc *tm, int status)
M0_INTERNAL void m0_net_xprt_register(const struct m0_net_xprt *xprt)
Definition: net.c:182
return M0_ERR(-EOPNOTSUPP)
void * ntm_xprt_private
Definition: net.h:886
static void mem_xo_dom_fini(struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:245
static m0_bcount_t mem_xo_get_max_buffer_size(const struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:257
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_copy(struct m0_bufvec_cursor *dcur, struct m0_bufvec_cursor *scur, m0_bcount_t num_bytes)
Definition: vec.c:620
static int mem_xo_buf_add(struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:401
m0_bcount_t xwi_nbe_length
Definition: mem_xprt.h:176
static void mem_xo_buf_deregister(struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:382
#define M0_ASSERT(cond)
static int mem_copy_buffer(struct m0_net_buffer *d_nb, struct m0_net_buffer *s_nb, m0_bcount_t num_bytes)
Definition: mem_xprt_xo.c:113
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
const struct m0_net_xprt * nd_xprt
Definition: net.h:396
static void mem_xo_end_point_release(struct m0_ref *ref)
Definition: mem_xprt_ep.c:40
M0_INTERNAL void m0_cond_fini(struct m0_cond *cond)
Definition: cond.c:46
m0_time_t m0_time_now(void)
Definition: time.c:134
#define M0_DEFAULT_NETWORK
Definition: config.h:281
#define m0_streq(a, b)
Definition: string.h:34
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
M0_INTERNAL void m0_cond_signal(struct m0_cond *cond)
Definition: cond.c:94
M0_INTERNAL m0_bcount_t default_xo_rpc_max_seg_size(struct m0_net_domain *ndom)
Definition: net.c:239
struct m0_net_domain * nb_dom
Definition: net.h:1351
M0_INTERNAL bool m0_list_contains(const struct m0_list *list, const struct m0_list_link *link)
Definition: list.c:87
#define M0_POST(cond)
Definition: xcode.h:73
static void mem_wf_active_bulk(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_bulk.c:56
static void mem_xo_tm_worker(struct m0_net_transfer_mc *tm)
Definition: mem_xprt_tm.c:180
uint32_t v_nr
Definition: vec.h:51
static m0_bcount_t mem_xo_get_max_buffer_segment_size(const struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:263
m0_bcount_t * v_count
Definition: vec.h:53
static const struct m0_net_xprt_ops mem_xo_xprt_ops
Definition: mem_xprt_xo.c:738
static struct m0_net_bulk_mem_buffer_pvt * mem_buffer_to_pvt(const struct m0_net_buffer *nb)
Definition: mem_xprt.h:202
m0_net_bulk_mem_work_fn_t bmo_work_fn[M0_NET_XOP_NR]
Definition: mem_xprt.h:281
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
Definition: list.h:72
M0_INTERNAL void m0_mem_xprt_fini(void)
Definition: mem_xprt_xo.c:59
static bool mem_buffer_in_bounds(const struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:85
int(* xo_end_point_create)(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: net.h:230
static int mem_xo_tm_init(struct m0_net_transfer_mc *tm)
Definition: mem_xprt_xo.c:526
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static void mem_wf_passive_bulk_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_bulk.c:33
static m0_bcount_t mem_buffer_length(const struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:77
const struct m0_net_bulk_mem_ops * xd_ops
Definition: mem_xprt.h:338
const char * nx_name
Definition: net.h:125
static bool mem_buffer_invariant(const struct m0_net_buffer *nb)
Definition: mem_xprt_xo.c:176
M0_INTERNAL void m0_net_xprt_deregister(const struct m0_net_xprt *xprt)
Definition: net.c:197
int(* xo_dom_init)(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: net.h:139
M0_INTERNAL void m0_cond_broadcast(struct m0_cond *cond)
Definition: cond.c:100
struct m0_net_domain * xd_dom
Definition: mem_xprt.h:335
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
static bool mem_tm_invariant(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt_xo.c:183
static struct bulkio_params * bp
Definition: bulkio_ut.c:44
struct m0_net_end_point xep_ep
Definition: mem_xprt.h:252
static int mem_ep_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const struct sockaddr_in *sa, uint32_t id)
Definition: mem_xprt_ep.c:109
M0_INTERNAL size_t m0_net_bulk_mem_tm_get_num_threads(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt_xo.c:606
M0_INTERNAL void m0_list_link_init(struct m0_list_link *link)
Definition: list.c:169
struct m0_net_xprt * xprt
Definition: module.c:61
static struct m0_net_bulk_mem_domain_pvt * mem_dom_to_pvt(const struct m0_net_domain *dom)
Definition: mem_xprt.h:375
M0_INTERNAL void m0_list_add_tail(struct m0_list *head, struct m0_list_link *new)
Definition: list.c:119
static void mem_ep_get(struct m0_net_end_point *ep)
Definition: mem_xprt_ep.c:101
char xep_addr[M0_NET_BULK_MEM_XEP_ADDR_LEN]
Definition: mem_xprt.h:255
static void mem_xo_tm_fini(struct m0_net_transfer_mc *tm)
Definition: mem_xprt_xo.c:562
static int mem_desc_create(struct m0_net_buf_desc *desc, struct m0_net_transfer_mc *tm, enum m0_net_queue_type qt, m0_bcount_t buflen, int64_t buf_id)
Definition: mem_xprt_ep.c:205
static int mem_xo_dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:200
int num
Definition: bulk_if.c:54
static struct m0_net_bulk_mem_tm_pvt * mem_tm_to_pvt(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt.h:231
M0_INTERNAL uint32_t default_xo_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.c:267
void m0_free(void *data)
Definition: memory.c:146
static void mem_ep_free(struct m0_net_bulk_mem_end_point *mep)
Definition: mem_xprt_ep.c:96
struct m0_net_transfer_mc * xtm_tm
Definition: mem_xprt.h:212
M0_INTERNAL m0_bcount_t default_xo_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.c:255
void * nb_xprt_private
Definition: net.h:1461
int32_t rc
Definition: trigger_fop.h:47
static int32_t mem_xo_get_max_buffer_segments(const struct m0_net_domain *dom)
Definition: mem_xprt_xo.c:270
static int mem_xo_tm_stop(struct m0_net_transfer_mc *tm, bool cancel)
Definition: mem_xprt_xo.c:680
static void mem_wf_error_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:129
const struct m0_net_xprt m0_net_bulk_mem_xprt
Definition: mem_xprt_xo.c:761
struct m0_net_end_point * nb_ep
Definition: net.h:1424
enum m0_thread_state t_state
Definition: thread.h:111
static int mem_bmo_ep_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const struct sockaddr_in *sa, uint32_t id)
Definition: mem_xprt_pvt.h:96
#define M0_IMPOSSIBLE(fmt,...)