Motr  M0
rpc_machine.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
30 #include "lib/trace.h"
31 #include "lib/misc.h"
32 #include "lib/memory.h"
33 #include "lib/errno.h"
34 #include "lib/finject.h" /* M0_FI_ENABLED */
35 #include "addb2/addb2.h"
36 #include "motr/magic.h"
37 #include "cob/cob.h"
38 #include "net/net.h"
39 #include "net/buffer_pool.h" /* m0_net_buffer_pool_[lock|unlock] */
40 #include "reqh/reqh.h"
41 #include "rpc/addb2.h"
42 #include "rpc/rpc_internal.h"
43 #include "net/lnet/lnet.h"
44 
45 /* Forward declarations. */
46 static void rpc_tm_cleanup(struct m0_rpc_machine *machine);
47 static int rpc_tm_setup(struct m0_net_transfer_mc *tm,
48  struct m0_net_domain *net_dom,
49  const char *ep_addr,
50  struct m0_net_buffer_pool *pool,
51  uint32_t colour,
52  m0_bcount_t msg_size,
53  uint32_t qlen);
54 static int __rpc_machine_init(struct m0_rpc_machine *machine);
55 static void __rpc_machine_fini(struct m0_rpc_machine *machine);
56 M0_INTERNAL void rpc_worker_thread_fn(struct m0_rpc_machine *machine);
57 static struct m0_rpc_chan *rpc_chan_locate(struct m0_rpc_machine *machine,
58  struct m0_net_end_point *dest_ep);
59 static int rpc_chan_create(struct m0_rpc_chan **chan,
60  struct m0_rpc_machine *machine,
61  struct m0_net_end_point *dest_ep,
62  uint64_t max_packets_in_flight);
63 static void rpc_chan_ref_release(struct m0_ref *ref);
64 static void rpc_recv_pool_buffer_put(struct m0_net_buffer *nb);
65 static void buf_recv_cb(const struct m0_net_buffer_event *ev);
66 static void net_buf_received(struct m0_net_buffer *nb,
68  m0_bcount_t length,
69  struct m0_net_end_point *from_ep);
70 static void packet_received(struct m0_rpc_packet *p,
71  struct m0_rpc_machine *machine,
72  struct m0_net_end_point *from_ep);
73 static void item_received(struct m0_rpc_item *item,
74  struct m0_net_end_point *from_ep);
75 static void net_buf_err(struct m0_net_buffer *nb, int32_t status);
76 
77 static const struct m0_bob_type rpc_machine_bob_type = {
78  .bt_name = "rpc_machine",
79  .bt_magix_offset = M0_MAGIX_OFFSET(struct m0_rpc_machine, rm_magix),
80  .bt_magix = M0_RPC_MACHINE_MAGIC,
81  .bt_check = NULL
82 };
83 
85 
90  .nbc_cb = {
92  }
93 };
94 
95 M0_TL_DESCR_DEFINE(rpc_chan, "rpc_channels", static, struct m0_rpc_chan,
96  rc_linkage, rc_magic, M0_RPC_CHAN_MAGIC,
98 M0_TL_DEFINE(rpc_chan, static, struct m0_rpc_chan);
99 
100 M0_TL_DESCR_DEFINE(rmach_watch, "rpc_machine_watch", M0_INTERNAL,
101  struct m0_rpc_machine_watch, mw_linkage, mw_magic,
104 M0_TL_DEFINE(rmach_watch, M0_INTERNAL, struct m0_rpc_machine_watch);
105 
106 M0_TL_DESCR_DEFINE(rpc_conn, "rpc-conn", M0_INTERNAL, struct m0_rpc_conn,
107  c_link, c_magic, M0_RPC_CONN_MAGIC, M0_RPC_CONN_HEAD_MAGIC);
108 M0_TL_DEFINE(rpc_conn, M0_INTERNAL, struct m0_rpc_conn);
109 
110 static void rpc_tm_event_cb(const struct m0_net_tm_event *ev)
111 {
112  /* Do nothing */
113 }
114 
121 };
122 
124  struct m0_net_domain *net_dom,
125  const char *ep_addr,
126  struct m0_reqh *reqh,
127  struct m0_net_buffer_pool *receive_pool,
128  uint32_t colour,
129  m0_bcount_t msg_size, uint32_t queue_len)
130 {
131  m0_bcount_t max_msg_size;
132  int rc;
133 
134  M0_ENTRY("machine: %p, net_dom: %p, ep_addr: %s, "
135  "reqh:%p", machine, net_dom, (char *)ep_addr, reqh);
136  M0_PRE(machine != NULL);
137  M0_PRE(ep_addr != NULL);
138  M0_PRE(net_dom != NULL);
139  M0_PRE(receive_pool != NULL);
140  M0_PRE(reqh != NULL);
141 
142  if (M0_FI_ENABLED("fake_error"))
143  return M0_ERR(-EINVAL);
144 
145  M0_SET0(machine);
146  max_msg_size = m0_rpc_max_msg_size(net_dom, msg_size);
147  machine->rm_reqh = reqh;
148  machine->rm_min_recv_size = max_msg_size;
149 
151  if (rc != 0)
152  return M0_ERR(rc);
153 
154  /* Bulk transmission requires data to be page aligned. */
155  machine->rm_bulk_cutoff = M0_FI_ENABLED("bulk_cutoff_4K") ? 4096 :
156  m0_align(max_msg_size / 2, m0_pagesize_get());
157  machine->rm_stopping = false;
159  NULL, &rpc_worker_thread_fn, machine, "m0_rpc_worker");
160  if (rc != 0)
161  goto err;
162 
163  rc = rpc_tm_setup(&machine->rm_tm, net_dom, ep_addr, receive_pool,
164  colour, msg_size, queue_len);
165  if (rc == 0)
166  return M0_RC(0);
167 
168  machine->rm_stopping = true;
171 err:
173  return M0_ERR(rc);
174 }
175 M0_EXPORTED(m0_rpc_machine_init);
176 
178 {
179  int rc;
180 
181  M0_ENTRY("machine: %p", machine);
182  rpc_chan_tlist_init(&machine->rm_chans);
183  rpc_conn_tlist_init(&machine->rm_incoming_conns);
184  rpc_conn_tlist_init(&machine->rm_outgoing_conns);
185  rmach_watch_tlist_init(&machine->rm_watch);
186 
188  if (rc != 0)
189  return M0_ERR(rc);
190 
191  m0_rpc_machine_bob_init(machine);
194  m0_reqh_rpc_mach_tlink_init_at_tail(machine,
196  return M0_RC(0);
197 }
198 
200 {
201  M0_ENTRY("machine %p", machine);
202 
203  m0_reqh_rpc_mach_tlink_del_fini(machine);
205 
207 
208  rmach_watch_tlist_fini(&machine->rm_watch);
209  rpc_conn_tlist_fini(&machine->rm_outgoing_conns);
210  rpc_conn_tlist_fini(&machine->rm_incoming_conns);
211  rpc_chan_tlist_fini(&machine->rm_chans);
212  m0_rpc_machine_bob_fini(machine);
213 
214  M0_LEAVE();
215 }
216 
218 {
219  struct m0_clink clink;
221 
224  while (machine->rm_active_nb > 0) {
228  }
231 }
232 
234 {
235  struct m0_rpc_machine_watch *watch;
236 
237  M0_ENTRY("machine: %p", machine);
238  M0_PRE(machine != NULL);
239 
241  machine->rm_stopping = true;
245 
246  M0_LOG(M0_INFO, "Waiting for RPC worker to join");
249 
251  M0_PRE(rpc_conn_tlist_is_empty(&machine->rm_outgoing_conns));
255 
256  /* Detach watchers if any */
257  m0_tl_for(rmach_watch, &machine->rm_watch, watch) {
258  rmach_watch_tlink_del_fini(watch);
259  if (watch->mw_mach_terminated != NULL)
260  watch->mw_mach_terminated(watch);
261  } m0_tl_endfor;
262 
265  M0_LEAVE();
266 }
267 M0_EXPORTED(m0_rpc_machine_fini);
268 
277  uint64_t h_sender_id;
278 };
279 
281 {
282  struct rpc_conn_holder *holder =
284  if (conn_state(holder->h_conn) == M0_RPC_CONN_FINALISED) {
285  /*
286  * Let m0_chan_wait wake up. Delete clink from chan so
287  * that chan can be finalized right after wait is finished
288  * along with connection finalizing.
289  *.
290  * Please don't change this unless you really understand
291  * it well. --umka
292  */
293  m0_clink_del(&holder->h_clink);
294  return false;
295  }
296  return true;
297 }
298 
299 M0_INTERNAL void
301 {
302  struct m0_rpc_conn *conn;
303  struct rpc_conn_holder holder;
304 
306 
307  /*
308  * As we wait without lock in the middle of loop, some connections
309  * may go away in that time. This is why we check for connections
310  * in this kind of loop and picking head to work with.
311  */
312  while (!rpc_conn_tlist_is_empty(&machine->rm_incoming_conns)) {
313  conn = rpc_conn_tlist_head(&machine->rm_incoming_conns);
314  /*
315  * It's possible that connection is in one of terminating
316  * states, but it isn't deleted from the list yet. Wait until
317  * the connection becomes M0_RPC_CONN_FINALISED and continue
318  * cleanup procedure for other connections.
319  */
322  /*
323  * Prepare struct that allows to find connection by
324  * on-stack declared clink. Move there some local
325  * things too.
326  */
327  holder.h_conn = conn;
328  holder.h_sender_id = conn->c_sender_id;
329  m0_clink_init(&holder.h_clink,
331  m0_clink_add(&conn->c_sm.sm_chan, &holder.h_clink);
333 
334  m0_chan_wait(&holder.h_clink);
335  /*
336  * Clink was removed from the chan by
337  * rpc_conn__on_finalised_cb().
338  */
339  m0_clink_fini(&holder.h_clink);
340 
342  M0_ASSERT(m0_tl_find(rpc_conn,
344  holder.h_sender_id ==
345  conn->c_sender_id) == NULL);
346  } else {
347  /*
348  * It is little chance that conn state can be FAILED.
349  * We can fight this problem later.
350  */
353  M0_LOG(M0_INFO, "Aborting conn %llu",
354  (unsigned long long)conn->c_sender_id);
357  &conn->c_ast);
359  }
360  }
361 
362  M0_POST(rpc_conn_tlist_is_empty(&machine->rm_incoming_conns));
363 }
364 
365 enum {
375  DRAIN_MAX = 128,
376 };
377 
378 /* Not static because formation ut requires it. */
379 M0_INTERNAL void rpc_worker_thread_fn(struct m0_rpc_machine *machine)
380 {
381  m0_time_t drained = m0_time_now();
382 
383  M0_ENTRY();
384  M0_PRE(machine != NULL);
385 
386  while (true) {
388  if (machine->rm_stopping) {
390  M0_LEAVE("RPC worker thread STOPPED");
391  return;
392  }
394  if (m0_time_is_in_past(drained + DRAIN_INTERVAL)) {
396  drained = m0_time_now();
397  }
400  }
401 }
402 
403 M0_INTERNAL void
405  uint32_t max_per_source)
406 {
407  struct m0_rpc_item_source *source;
408  struct m0_rpc_conn *conn;
409  struct m0_rpc_item *item;
410  m0_bcount_t max_size;
411  uint32_t sent;
412 
413  M0_ENTRY();
414 
416 
417  M0_LOG(M0_DEBUG, "max_size: %llu", (unsigned long long)max_size);
418  m0_tl_for(rpc_conn, &machine->rm_outgoing_conns, conn) {
419  m0_tl_for(item_source, &conn->c_item_sources, source) {
420  sent = 0;
421  while (sent < max_per_source &&
422  source->ris_ops->riso_has_item(source)) {
423  item = source->ris_ops->riso_get_item(source,
424  max_size);
425  if (item == NULL)
426  break;
427  M0_LOG(M0_DEBUG, "item: %p", item);
428  /*
429  * Avoid placing items to the urgent queue.
430  * Otherwise, an RPC packet would be created
431  * draining item sources over the limit to
432  * fill the packet.
433  * @see frm_fill_packet_from_item_sources()
434  */
436  DRAIN_INTERVAL / 2;
439  M0_CNT_INC(sent);
440  }
441  } m0_tl_endfor;
442  } m0_tl_endfor;
443 
444  M0_LEAVE();
445 }
446 
447 static struct m0_rpc_machine *
449 {
450  return container_of(tm, struct m0_rpc_machine, rm_tm);
451 }
452 
453 static int rpc_tm_setup(struct m0_net_transfer_mc *tm,
454  struct m0_net_domain *net_dom,
455  const char *ep_addr,
456  struct m0_net_buffer_pool *pool,
457  uint32_t colour,
458  m0_bcount_t msg_size,
459  uint32_t qlen)
460 {
461  struct m0_clink tmwait;
462  int rc;
463  uint32_t min_recv_size;
464 
465  M0_ENTRY("tm: %p, net_dom: %p, ep_addr: %s", tm, net_dom,
466  (char *)ep_addr);
467  M0_PRE(tm != NULL && net_dom != NULL && ep_addr != NULL);
468  M0_PRE(pool != NULL);
469 
472 
473  rc = m0_net_tm_init(tm, net_dom);
474  if (rc < 0)
475  return M0_ERR_INFO(rc, "TM initialization");
476 
477  if (net_dom->nd_xprt == &m0_net_lnet_xprt)
478  min_recv_size = m0_rpc_max_msg_size(net_dom, msg_size);
479  else
480  min_recv_size = 1;
481 
483  min_recv_size,
484  m0_rpc_max_recv_msgs(net_dom, msg_size),
485  qlen);
486  if (rc < 0) {
487  m0_net_tm_fini(tm);
488  return M0_ERR_INFO(rc, "m0_net_tm_pool_attach");
489  }
490 
491  m0_net_tm_colour_set(tm, colour);
492 
493  /* Start the transfer machine so that users of this rpc_machine
494  can send/receive messages. */
495  m0_clink_init(&tmwait, NULL);
496  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
497 
498  rc = m0_net_tm_start(tm, ep_addr);
499  if (rc == 0) {
500  while (tm->ntm_state != M0_NET_TM_STARTED &&
502  m0_chan_wait(&tmwait);
503  }
504  m0_clink_del_lock(&tmwait);
505  m0_clink_fini(&tmwait);
506 
507  if (tm->ntm_state == M0_NET_TM_FAILED) {
511  rc = rc ?: -ENETUNREACH;
512  m0_net_tm_fini(tm);
513  return M0_ERR_INFO(rc, "TM start");
514  }
515  return M0_RC(rc);
516 }
517 
519 {
520  int rc;
521  struct m0_clink tmwait;
522  struct m0_net_transfer_mc *tm = &machine->rm_tm;
523 
524  M0_ENTRY("machine: %p", machine);
525  M0_PRE(machine != NULL);
526 
527  m0_clink_init(&tmwait, NULL);
528  m0_clink_add_lock(&tm->ntm_chan, &tmwait);
529 
530  rc = m0_net_tm_stop(tm, true);
531 
532  if (rc < 0) {
533  m0_clink_del_lock(&tmwait);
534  m0_clink_fini(&tmwait);
535  M0_LOG(M0_ERROR, "TM stopping: FAILED with err: %d", rc);
536  M0_LEAVE();
537  return;
538  }
539  /* Wait for transfer machine to stop. */
540  while (tm->ntm_state != M0_NET_TM_STOPPED &&
542  m0_chan_wait(&tmwait);
543  m0_clink_del_lock(&tmwait);
544  m0_clink_fini(&tmwait);
545 
546  /* Fini the transfer machine here and deallocate the chan. */
547  m0_net_tm_fini(tm);
548  M0_LEAVE();
549 }
550 
551 M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
552 {
553  M0_PRE(machine != NULL);
556 }
557 
559 {
560  M0_PRE(machine != NULL);
563 }
564 
565 M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
566 {
567  M0_PRE(machine != NULL);
569 }
570 M0_EXPORTED(m0_rpc_machine_is_locked);
571 
572 M0_INTERNAL bool
574 {
575  M0_PRE(machine != NULL);
577 }
578 M0_EXPORTED(m0_rpc_machine_is_not_locked);
579 
581  struct m0_rpc_stats *stats, bool reset)
582 {
583  M0_PRE(machine != NULL);
585  M0_PRE(stats != NULL);
586 
587  *stats = machine->rm_stats;
588  if (reset)
590 }
591 
593  struct m0_rpc_stats *stats, bool reset)
594 {
595  M0_PRE(machine != NULL);
596 
600 }
601 M0_EXPORTED(m0_rpc_machine_get_stats);
602 
603 M0_INTERNAL const char *m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
604 {
605  return rmach->rm_tm.ntm_ep->nep_addr;
606 }
607 
608 M0_INTERNAL void m0_rpc_machine_add_conn(struct m0_rpc_machine *rmach,
609  struct m0_rpc_conn *conn)
610 {
611  struct m0_rpc_machine_watch *watch;
612  struct m0_tl *tlist;
613 
614  M0_ENTRY("rmach: %p conn: %p", rmach, conn);
615 
617  M0_PRE(conn != NULL && !rpc_conn_tlink_is_in(conn));
619  !(conn->c_flags & RCF_RECV_END)));
620 
621  tlist = (conn->c_flags & RCF_SENDER_END) ? &rmach->rm_outgoing_conns :
622  &rmach->rm_incoming_conns;
623  rpc_conn_tlist_add(tlist, conn);
624  M0_LOG(M0_DEBUG, "rmach %p conn %p added to %s list", rmach, conn,
625  (conn->c_flags & RCF_SENDER_END) ? "outgoing" : "incoming");
626  m0_tl_for(rmach_watch, &rmach->rm_watch, watch) {
627  if (watch->mw_conn_added != NULL)
628  watch->mw_conn_added(watch, conn);
629  } m0_tl_endfor;
630 
631  M0_POST(rpc_conn_tlink_is_in(conn));
632  M0_LEAVE();
633 }
634 
635 M0_INTERNAL struct m0_rpc_chan *rpc_chan_get(struct m0_rpc_machine *machine,
636  struct m0_net_end_point *dest_ep,
637  uint64_t max_packets_in_flight)
638 {
639  struct m0_rpc_chan *chan;
640 
641  M0_ENTRY("machine: %p, max_packets_in_flight: %llu", machine,
642  (unsigned long long)max_packets_in_flight);
643  M0_PRE(dest_ep != NULL);
645 
646  if (M0_FI_ENABLED("fake_error"))
647  return NULL;
648 
649  if (M0_FI_ENABLED("do_nothing"))
650  return (struct m0_rpc_chan *)1;
651 
652  chan = rpc_chan_locate(machine, dest_ep);
653  if (chan == NULL)
654  rpc_chan_create(&chan, machine, dest_ep, max_packets_in_flight);
655 
656  M0_LEAVE("chan: %p", chan);
657  return chan;
658 }
659 
661  struct m0_net_end_point *dest_ep)
662 {
663  struct m0_rpc_chan *chan;
664  bool found;
665 
666  M0_ENTRY("machine: %p, dest_ep_addr: %s", machine,
667  (char *)dest_ep->nep_addr);
668  M0_PRE(dest_ep != NULL);
670 
671  found = false;
672  /* Locate the chan from rpc_machine->chans list. */
673  m0_tl_for(rpc_chan, &machine->rm_chans, chan) {
674  M0_ASSERT(chan->rc_destep->nep_tm->ntm_dom ==
675  dest_ep->nep_tm->ntm_dom);
676  if (chan->rc_destep == dest_ep) {
677  m0_ref_get(&chan->rc_ref);
678  m0_net_end_point_get(chan->rc_destep);
679  found = true;
680  break;
681  }
682  } m0_tl_endfor;
683 
684  M0_LEAVE("rc: %p", found ? chan : NULL);
685  return found ? chan : NULL;
686 }
687 
688 static int rpc_chan_create(struct m0_rpc_chan **chan,
689  struct m0_rpc_machine *machine,
690  struct m0_net_end_point *dest_ep,
691  uint64_t max_packets_in_flight)
692 {
694  struct m0_net_domain *ndom;
695  struct m0_rpc_chan *ch;
696 
697  M0_ENTRY("machine: %p", machine);
698  M0_PRE(chan != NULL);
699  M0_PRE(dest_ep != NULL);
700 
702 
703  M0_ALLOC_PTR(ch);
704  if (ch == NULL) {
705  *chan = NULL;
706  return M0_ERR(-ENOMEM);
707  }
708 
709  ch->rc_rpc_machine = machine;
710  ch->rc_destep = dest_ep;
711  m0_ref_init(&ch->rc_ref, 1, rpc_chan_ref_release);
712  m0_net_end_point_get(dest_ep);
713 
714  ndom = machine->rm_tm.ntm_dom;
715 
716  constraints.fc_max_nr_packets_enqed = max_packets_in_flight;
722 
724  rpc_chan_tlink_init_at(ch, &machine->rm_chans);
725  *chan = ch;
726  return M0_RC(0);
727 }
728 
729 M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan)
730 {
731  struct m0_rpc_machine *machine;
732 
733  M0_ENTRY();
734  M0_PRE(chan != NULL);
735 
736  if (M0_FI_ENABLED("do_nothing"))
737  return;
738 
739  machine = chan->rc_rpc_machine;
741 
742  m0_net_end_point_put(chan->rc_destep);
743  m0_ref_put(&chan->rc_ref);
744  M0_LEAVE();
745 }
746 
747 static void rpc_chan_ref_release(struct m0_ref *ref)
748 {
749  struct m0_rpc_chan *chan;
750 
751  M0_ENTRY();
752  M0_PRE(ref != NULL);
753 
754  chan = container_of(ref, struct m0_rpc_chan, rc_ref);
755  M0_ASSERT(chan != NULL);
756  M0_ASSERT(m0_rpc_machine_is_locked(chan->rc_rpc_machine));
757 
758  rpc_chan_tlist_del(chan);
759  m0_rpc_frm_fini(&chan->rc_frm);
760  m0_free(chan);
761  M0_LEAVE();
762 }
763 
764 static void buf_recv_cb(const struct m0_net_buffer_event *ev)
765 {
766  struct m0_net_buffer *nb;
767  bool buf_is_queued;
768 
769  M0_ENTRY();
770  nb = ev->nbe_buffer;
771  M0_PRE(nb != NULL);
772 
773  if (ev->nbe_status == 0) {
775  ev->nbe_ep);
776  } else {
777  if (ev->nbe_status != -ECANCELED)
778  net_buf_err(nb, ev->nbe_status);
779  }
780  buf_is_queued = (nb->nb_flags & M0_NET_BUF_QUEUED);
781  if (!buf_is_queued)
783  M0_LEAVE();
784 }
785 
786 static void net_buf_received(struct m0_net_buffer *nb,
788  m0_bcount_t length,
789  struct m0_net_end_point *from_ep)
790 {
791  struct m0_rpc_machine *machine;
792  struct m0_rpc_packet p;
793  int rc;
794 
795  M0_ENTRY("net_buf: %p, offset: %llu, length: %llu,"
796  "ep_addr: %s", nb, (unsigned long long)offset,
797  (unsigned long long)length, (char *)from_ep->nep_addr);
798 
801  rc = m0_rpc_packet_decode(&p, &nb->nb_buffer, offset, length);
802  if (rc != 0)
803  M0_LOG(M0_ERROR, "Packet decode error: %i.", rc);
804  /* There might be items in packet p, which were successfully decoded
805  before an error occurred. */
806  packet_received(&p, machine, from_ep);
808  M0_LEAVE();
809 }
810 
811 static void packet_received(struct m0_rpc_packet *p,
812  struct m0_rpc_machine *machine,
813  struct m0_net_end_point *from_ep)
814 {
815  struct m0_rpc_item *item;
816 
817  M0_ENTRY("p %p", p);
818 
820  machine->rm_stats.rs_nr_rcvd_bytes += p->rp_size;
821  /* packet p can also be empty */
827  item_received(item, from_ep);
831 
832  M0_LEAVE();
833 }
834 
835 M0_INTERNAL struct m0_rpc_conn *
837  const struct m0_rpc_item *item)
838 {
839  const struct m0_rpc_item_header2 *header;
840  const struct m0_tl *conn_list;
841  struct m0_rpc_conn *conn;
842  bool use_uuid;
843 
844  M0_ENTRY("machine: %p, item: %p", machine, item);
845 
846  header = &item->ri_header;
847  use_uuid = (header->osr_sender_id == SENDER_ID_INVALID);
848  conn_list = m0_rpc_item_is_request(item) ?
851 
852  m0_tl_for(rpc_conn, conn_list, conn) {
853  if (use_uuid) {
854  if (m0_uint128_cmp(&conn->c_uuid,
855  &header->osr_uuid) == 0)
856  break;
857  } else if (conn->c_sender_id == header->osr_sender_id) {
858  break;
859  }
860  } m0_tl_endfor;
861 
862  M0_LEAVE("item=%p conn=%p use_uuid=%d header->osr_uuid="U128X_F" "
863  "header->osr_sender_id=%"PRIu64, item, conn, !!use_uuid,
864  U128_P(&header->osr_uuid), header->osr_sender_id);
865  return conn;
866 }
867 
868 M0_INTERNAL void (*m0_rpc__item_dropped)(struct m0_rpc_item *item);
869 static bool item_received_fi(struct m0_rpc_item *item);
870 
871 static void item_received(struct m0_rpc_item *item,
872  struct m0_net_end_point *from_ep)
873 {
875  int rc;
876 
877  M0_ENTRY("machine: %p, item: %p[%s/%u] size %zu xid=%"PRIu64
878  " from ep_addr: %s, oneway = %d",
881  (char *)from_ep->nep_addr, !!m0_rpc_item_is_oneway(item));
882 
883  if (item_received_fi(item))
884  return;
885 
888 
890 
893  if (rc == 0) {
894  /* Duplicate request can be replied from the cache. */
895  if (!M0_IN(item->ri_sm.sm_state, (M0_RPC_ITEM_FAILED,
898  } else {
899  M0_LOG(M0_DEBUG, "%p [%s/%d] dropped", item, item_kind(item),
902  if (M0_FI_ENABLED("drop_signal") &&
905  }
906 
907  M0_LEAVE();
908 }
909 
910 uint32_t m0_rpc__filter_opcode[4] = {};
911 
912 static bool item_received_fi(struct m0_rpc_item *item)
913 {
914  uint32_t opcode = item->ri_type->rit_opcode;
915 
916  if (M0_FI_ENABLED("drop_item")) {
917  M0_LOG(M0_DEBUG, "%p[%s/%u] dropped", item,
918  item_kind(item), opcode);
919  return true;
920  }
921  if (m0_rpc_item_is_reply(item) && M0_FI_ENABLED("drop_item_reply")) {
922  M0_LOG(M0_DEBUG, "%p[%s/%u] dropped", item,
923  item_kind(item), opcode);
924  return true;
925  }
926  /*
927  * setattr and getattr cases below are invoked via
928  * /sys/kernel/debug/motr/finject/ctl by ST and because of this have to
929  * be separate from more generic "drop_opcode".
930  */
931  if ((M0_FI_ENABLED("drop_setattr_item_reply") &&
933  (M0_FI_ENABLED("drop_getattr_item_reply") &&
935  M0_LOG(M0_DEBUG, "%p[%s/%u] [sg]etattr reply dropped",
937  return true;
938  }
939  if (M0_FI_ENABLED("drop_opcode") &&
942  M0_LOG(M0_DEBUG, "%p[%s/%u] dropped",
944  return true;
945  }
946  return false;
947 }
948 
949 static void net_buf_err(struct m0_net_buffer *nb, int32_t status)
950 {
951 }
952 
953 /* Put buffer back into the pool */
955 {
956  struct m0_net_transfer_mc *tm;
957 
958  M0_ENTRY("net_buf: %p", nb);
959  M0_PRE(nb != NULL);
960  tm = nb->nb_tm;
961 
962  M0_PRE(tm != NULL);
963  M0_PRE(tm->ntm_recv_pool != NULL && nb->nb_pool != NULL);
964  M0_PRE(tm->ntm_recv_pool == nb->nb_pool);
965 
966  nb->nb_ep = NULL;
969  tm->ntm_pool_colour);
971 
972  M0_LEAVE();
973 }
974 
975 /*
976  * RPC machine watch.
977  */
978 
980 {
981  struct m0_rpc_machine *rmach;
982 
983  M0_ENTRY("watch: %p", watch);
984  M0_PRE(watch != NULL);
985 
986  rmach = watch->mw_mach;
987  M0_PRE(rmach != NULL);
989 
990  m0_rpc_machine_lock(rmach);
991  rmach_watch_tlink_init_at_tail(watch, &rmach->rm_watch);
992  m0_rpc_machine_unlock(rmach);
993 
994  M0_LEAVE();
995 }
996 
998 {
999  struct m0_rpc_machine *rmach;
1000 
1001  M0_ENTRY("watch: %p", watch);
1002  M0_PRE(watch != NULL);
1003 
1004  rmach = watch->mw_mach;
1005  M0_PRE(rmach != NULL);
1007 
1008  m0_rpc_machine_lock(rmach);
1009  if (rmach_watch_tlink_is_in(watch))
1010  rmach_watch_tlink_del_fini(watch);
1011  m0_rpc_machine_unlock(rmach);
1012 
1013  M0_LEAVE();
1014 }
1015 
1016 #undef M0_TRACE_SUBSYSTEM
1017 
1020 /*
1021  * Local variables:
1022  * c-indentation-style: "K&R"
1023  * c-basic-offset: 8
1024  * tab-width: 8
1025  * fill-column: 80
1026  * scroll-step: 1
1027  * End:
1028  */
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan)
Definition: rpc_machine.c:729
struct m0_tl rm_chans
Definition: rpc_machine.h:86
M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
Definition: formation2.c:222
uint32_t rit_opcode
Definition: item.h:474
struct m0_rpc_machine * mw_mach
Definition: rpc_machine.h:234
static struct m0_addb2_philter p
Definition: consumer.c:40
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
static int __rpc_machine_init(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:177
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
void m0_rpc_machine_fini(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:233
struct m0_chan rm_nb_idle
Definition: rpc_machine.h:141
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: tm.c:261
struct m0_net_buffer_pool * nb_pool
Definition: net.h:1508
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:728
struct m0_rpc_conn * h_conn
Definition: rpc_machine.c:275
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
static void item_received(struct m0_rpc_item *item, struct m0_net_end_point *from_ep)
Definition: rpc_machine.c:871
void(* mw_conn_added)(struct m0_rpc_machine_watch *w, struct m0_rpc_conn *conn)
Definition: rpc_machine.h:244
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL uint32_t m0_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: rpc.c:312
M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
Definition: service.c:300
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
static const char * ep_addr
Definition: rpc_machine.c:35
const struct m0_net_xprt m0_net_lnet_xprt
Definition: lnet_xo.c:679
size_t ri_size
Definition: item.h:198
M0_INTERNAL int m0_rpc_packet_decode(struct m0_rpc_packet *p, struct m0_bufvec *bufvec, m0_bindex_t off, m0_bcount_t len)
Definition: packet.c:367
M0_INTERNAL struct m0_rpc_conn * m0_rpc_machine_find_conn(const struct m0_rpc_machine *machine, const struct m0_rpc_item *item)
Definition: rpc_machine.c:836
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static void machine_nb_idle(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:217
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:203
struct m0_tl rm_outgoing_conns
Definition: rpc_machine.h:95
struct m0_clink s_clink
Definition: sm.h:516
enum m0_net_tm_state ntm_state
Definition: net.h:819
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
#define M0_ADDB2_PUSH(id,...)
Definition: addb2.h:261
struct m0_sm ri_sm
Definition: item.h:181
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
static struct m0_rpc_frm_constraints constraints
Definition: formation2.c:35
uint64_t m0_bindex_t
Definition: types.h:80
#define m0_exists(var, nr,...)
Definition: misc.h:134
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
Definition: item.c:356
M0_INTERNAL int m0_rpc_rcv_conn_terminate(struct m0_rpc_conn *conn)
Definition: conn.c:1217
const char * nep_addr
Definition: net.h:503
m0_bindex_t nbe_offset
Definition: net.h:1238
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
static struct m0_net_tm_callbacks m0_rpc_tm_callbacks
Definition: rpc_machine.c:119
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_tl c_item_sources
Definition: conn.h:314
M0_INTERNAL int m0_net_tm_pool_attach(struct m0_net_transfer_mc *tm, struct m0_net_buffer_pool *bufpool, const struct m0_net_buffer_callbacks *callbacks, m0_bcount_t min_recv_size, uint32_t max_recv_msgs, uint32_t min_recv_queue_len)
Definition: tm.c:459
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
static void rpc_tm_cleanup(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:518
const char * bt_name
Definition: bob.h:73
struct m0_ref rc_ref
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
Definition: item.c:104
struct m0_sm c_sm
Definition: conn.h:322
return M0_RC(rc)
m0_time_t ri_rpc_time
Definition: item.h:202
#define equi(a, b)
Definition: misc.h:297
struct m0_chan ntm_chan
Definition: net.h:874
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL int m0_pagesize_get(void)
Definition: memory.c:233
uint64_t osr_xid
Definition: onwire.h:105
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
M0_INTERNAL void m0_rpc_packet_init(struct m0_rpc_packet *p, struct m0_rpc_machine *rmach)
Definition: packet.c:103
int opcode
Definition: crate.c:301
static int rpc_tm_setup(struct m0_net_transfer_mc *tm, struct m0_net_domain *net_dom, const char *ep_addr, struct m0_net_buffer_pool *pool, uint32_t colour, m0_bcount_t msg_size, uint32_t qlen)
Definition: rpc_machine.c:453
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
void m0_rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
Definition: rpc_machine.c:592
int i
Definition: dir.c:1033
M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm, struct m0_rpc_frm_constraints *constraints, const struct m0_rpc_frm_ops *ops)
Definition: formation2.c:197
M0_INTERNAL void m0_rpc_packet_fini(struct m0_rpc_packet *p)
Definition: packet.c:122
#define PRIu64
Definition: types.h:58
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
bool m0_time_is_in_past(m0_time_t t)
Definition: time.c:102
int32_t nbe_status
Definition: net.h:1218
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
m0_bcount_t rm_bulk_cutoff
Definition: rpc_machine.h:157
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
Definition: tm.c:160
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
uint64_t c_sender_id
Definition: conn.h:269
Definition: trace.h:482
Definition: refs.h:34
struct m0_clink h_clink
Definition: rpc_machine.c:276
uint64_t c_flags
Definition: conn.h:275
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
Definition: item.c:704
uint64_t h_sender_id
Definition: rpc_machine.c:277
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
uint64_t rs_nr_rcvd_bytes
Definition: rpc_machine.h:74
#define M0_ASSERT(cond)
static void packet_received(struct m0_rpc_packet *p, struct m0_rpc_machine *machine, struct m0_net_end_point *from_ep)
Definition: rpc_machine.c:811
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
Definition: tm.c:204
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_rpc_fop_conn_establish_ctx_init(struct m0_rpc_item *item, struct m0_net_end_point *ep)
Definition: session_fops.c:209
const struct m0_net_xprt * nd_xprt
Definition: net.h:396
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
M0_TL_DESCR_DEFINE(rpc_chan, "rpc_channels", static, struct m0_rpc_chan, rc_linkage, rc_magic, M0_RPC_CHAN_MAGIC, M0_RPC_CHAN_HEAD_MAGIC)
#define U128_P(x)
Definition: types.h:45
struct m0_tl rm_watch
Definition: rpc_machine.h:124
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
m0_time_t m0_time_now(void)
Definition: time.c:134
M0_INTERNAL m0_bcount_t m0_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: rpc.c:302
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
Definition: tlist.h:251
uint32_t rm_min_recv_size
Definition: rpc_machine.h:148
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
static void rpc_chan_ref_release(struct m0_ref *ref)
Definition: rpc_machine.c:747
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:186
M0_INTERNAL int m0_rpc_machine_init(struct m0_rpc_machine *machine, struct m0_net_domain *net_dom, const char *ep_addr, struct m0_reqh *reqh, struct m0_net_buffer_pool *receive_pool, uint32_t colour, m0_bcount_t msg_size, uint32_t queue_len)
Definition: rpc_machine.c:123
uint64_t rs_nr_dropped_items
Definition: rpc_machine.h:59
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
void m0_rpc_machine_watch_attach(struct m0_rpc_machine_watch *watch)
Definition: rpc_machine.c:979
static void net_buf_received(struct m0_net_buffer *nb, m0_bindex_t offset, m0_bcount_t length, struct m0_net_end_point *from_ep)
Definition: rpc_machine.c:786
#define M0_POST(cond)
Definition: reqh.h:94
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
Definition: conn.c:1258
static m0_bindex_t offset
Definition: dump.c:173
struct m0_net_transfer_mc * nep_tm
Definition: net.h:493
struct m0_rpc_conn conn
Definition: fsync.c:96
void m0_rpc_machine_watch_detach(struct m0_rpc_machine_watch *watch)
Definition: rpc_machine.c:997
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_mutex s_lock
Definition: sm.h:514
struct m0_tl rm_incoming_conns
Definition: rpc_machine.h:94
struct m0_rpc_machine machine
Definition: mdstore.c:58
static void __rpc_machine_fini(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:199
M0_INTERNAL void m0_net_tm_colour_set(struct m0_net_transfer_mc *tm, uint32_t colour)
Definition: tm.c:436
void m0_addb2_pop(uint64_t id)
Definition: addb2.c:440
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
Definition: tm.c:293
struct m0_thread rm_worker
Definition: rpc_machine.h:129
M0_INTERNAL void m0_rpc_conn_cleanup_all_sessions(struct m0_rpc_conn *conn)
Definition: conn.c:1174
static struct m0_pool pool
Definition: iter_ut.c:58
M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
Definition: item.c:516
struct m0_uint128 c_uuid
Definition: conn.h:272
#define U128X_F
Definition: types.h:42
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
M0_INTERNAL void m0_rpc_conn_terminate_reply_sent(struct m0_rpc_conn *conn)
Definition: conn.c:1237
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
void(* mw_mach_terminated)(struct m0_rpc_machine_watch *w)
Definition: rpc_machine.h:260
struct m0_sm_ast c_ast
Definition: conn.h:330
M0_INTERNAL bool m0_rpc_machine_is_not_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:573
uint32_t m0_rpc__filter_opcode[4]
Definition: rpc_machine.c:910
struct m0_reqh reqh
Definition: rm_foms.c:48
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
static bool rpc_conn__on_finalised_cb(struct m0_clink *clink)
Definition: rpc_machine.c:280
#define M0_CNT_INC(cnt)
Definition: arith.h:226
static struct m0_chan chan[RDWR_REQUEST_MAX]
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
struct m0_net_end_point * ntm_ep
Definition: net.h:868
m0_bcount_t fc_max_nr_bytes_accumulated
static struct m0_rpc_chan * rpc_chan_locate(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep)
Definition: rpc_machine.c:660
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
Definition: service.c:292
M0_INTERNAL void m0_rpc_machine_drain_item_sources(struct m0_rpc_machine *machine, uint32_t max_per_source)
Definition: rpc_machine.c:404
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
Definition: beck.c:130
M0_BOB_DEFINE(, &rpc_machine_bob_type, m0_rpc_machine)
static void net_buf_err(struct m0_net_buffer *nb, int32_t status)
Definition: rpc_machine.c:949
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
M0_INTERNAL void m0_rpc_packet_remove_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
Definition: packet.c:165
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
#define for_each_item_in_packet(item, packet)
M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
Definition: chan.c:104
M0_INTERNAL void m0_net_buffer_pool_put(struct m0_net_buffer_pool *pool, struct m0_net_buffer *buf, uint32_t colour)
Definition: buffer_pool.c:243
#define M0_MKTIME(secs, ns)
Definition: time.h:86
static uint64_t found
Definition: base.c:376
uint64_t rm_active_nb
Definition: rpc_machine.h:139
M0_INTERNAL struct m0_rpc_chan * rpc_chan_get(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_packets_in_flight)
Definition: rpc_machine.c:635
M0_INTERNAL void m0_rpc_machine_add_conn(struct m0_rpc_machine *rmach, struct m0_rpc_conn *conn)
Definition: rpc_machine.c:608
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:183
uint32_t ntm_pool_colour
Definition: net.h:921
static struct m0_rpc_machine * tm_to_rpc_machine(const struct m0_net_transfer_mc *tm)
Definition: rpc_machine.c:448
M0_INTERNAL void rpc_worker_thread_fn(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:379
static void buf_recv_cb(const struct m0_net_buffer_event *ev)
Definition: rpc_machine.c:764
struct m0_rpc_stats rm_stats
Definition: rpc_machine.h:96
M0_INTERNAL void m0_rpc_oneway_item_post_locked(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:184
static void rpc_tm_event_cb(const struct m0_net_tm_event *ev)
Definition: rpc_machine.c:110
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
M0_INTERNAL void m0_rpc_machine_cleanup_incoming_connections(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:300
static void __rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
Definition: rpc_machine.c:580
M0_INTERNAL void m0_sm_asts_run(struct m0_sm_group *grp)
Definition: sm.c:150
static void rpc_recv_pool_buffer_put(struct m0_net_buffer *nb)
Definition: rpc_machine.c:954
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
Definition: net.c:93
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL int m0_rpc_item_received(struct m0_rpc_item *item, struct m0_rpc_machine *machine)
Definition: item.c:1208
static int rpc_chan_create(struct m0_rpc_chan **chan, struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_packets_in_flight)
Definition: rpc_machine.c:688
static const struct m0_net_buffer_callbacks rpc_buf_recv_cb
Definition: rpc_machine.c:89
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
static uint64_t m0_align(uint64_t val, uint64_t alignment)
Definition: arith.h:170
const struct m0_net_tm_callbacks * ntm_callbacks
Definition: net.h:816
const struct m0_rpc_frm_ops m0_rpc_frm_default_ops
Definition: frmops.c:65
uint64_t rs_nr_rcvd_packets
Definition: rpc_machine.h:68
M0_INTERNAL void(* m0_rpc__item_dropped)(struct m0_rpc_item *item)
Definition: rpc_machine.c:868
struct m0_reqh * rm_reqh
Definition: rpc_machine.h:105
#define end_for_each_item_in_packet
static bool item_received_fi(struct m0_rpc_item *item)
Definition: rpc_machine.c:912
M0_TL_DEFINE(rpc_chan, static, struct m0_rpc_chan)
static int conn_state(const struct m0_rpc_conn *conn)
Definition: conn_internal.h:66
struct m0_net_end_point * nb_ep
Definition: net.h:1424
m0_time_t ri_deadline
Definition: item.h:141
static const struct m0_bob_type rpc_machine_bob_type
Definition: rpc_machine.c:77
struct m0_net_buffer_pool * ntm_recv_pool
Definition: net.h:896