Motr  M0
item.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 #include "lib/tlist.h"
28 #include "lib/rwlock.h"
29 #include "lib/misc.h"
30 #include "lib/errno.h"
31 #include "lib/finject.h"
32 #include "conf/obj.h" /* m0_conf_fid_type */
33 #include "ha/epoch.h"
34 #include "ha/note.h"
35 #include "motr/magic.h"
36 #include "addb2/addb2.h"
37 #include "rpc/addb2.h"
38 #include "rpc/rpc_internal.h"
39 #include "rpc/rpc_opcodes_xc.h" /* m0_xc_M0_RPC_OPCODES_enum */
40 #include "motr/iem.h"
41 
48 static int item_entered_in_urgent_state(struct m0_sm *mach);
49 static void item_timer_cb(struct m0_sm_timer *timer);
50 static void item_timedout(struct m0_rpc_item *item);
51 static void item_resend(struct m0_rpc_item *item);
52 static int item_reply_received(struct m0_rpc_item *reply,
53  struct m0_rpc_item **req_out);
54 static bool item_reply_received_fi(struct m0_rpc_item *req,
55  struct m0_rpc_item *reply);
56 static int req_replied(struct m0_rpc_item *req, struct m0_rpc_item *reply);
57 static void rpc_item_xid_unassign(struct m0_rpc_item *item);
58 
61 
62 M0_TL_DESCR_DEFINE(rpcitem, "rpc item tlist", M0_INTERNAL, struct m0_rpc_item,
63  ri_field, ri_magic, M0_RPC_ITEM_MAGIC,
65 M0_TL_DEFINE(rpcitem, M0_INTERNAL, struct m0_rpc_item);
66 
67 M0_TL_DESCR_DEFINE(rit, "rpc_item_type_descr", static, struct m0_rpc_item_type,
68  rit_linkage, rit_magic, M0_RPC_ITEM_TYPE_MAGIC,
70 M0_TL_DEFINE(rit, static, struct m0_rpc_item_type);
71 
72 M0_TL_DESCR_DEFINE(ric, "rpc item cache", M0_INTERNAL,
73  struct m0_rpc_item, ri_cache_link, ri_magic,
75 M0_TL_DEFINE(ric, M0_INTERNAL, struct m0_rpc_item);
76 
77 M0_TL_DESCR_DEFINE(pending_item, "pending-item-list", M0_INTERNAL,
78  struct m0_rpc_item, ri_pending_link, ri_magic,
80 M0_TL_DEFINE(pending_item, M0_INTERNAL, struct m0_rpc_item);
81 
82 M0_TL_DESCR_DEFINE(xidl, "rpc session xid list", M0_INTERNAL,
83  struct m0_rpc_item, ri_xid_link, ri_magic,
85 M0_TL_DEFINE(xidl, M0_INTERNAL, struct m0_rpc_item);
86 
88 static struct m0_tl rpc_item_types_list;
90 
97 static bool opcode_is_dup(uint32_t opcode)
98 {
99  M0_PRE(opcode > 0);
100 
102 }
103 
106 
107 #define HEADER1_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_header1_xc, ptr)
108 #define HEADER2_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_header2_xc, ptr)
109 #define FOOTER_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_footer_xc, ptr)
110 
111 M0_INTERNAL int m0_rpc_item_module_init(void)
112 {
113  int h1_len;
114  int h2_len;
115  int f_len;
116  void *dummy = NULL;
117  struct m0_xcode_ctx h_xc;
118 
119  M0_ENTRY();
120 
122  rit_tlist_init(&rpc_item_types_list);
123 
125  h1_len = m0_xcode_length(&h_xc);
127  h2_len = m0_xcode_length(&h_xc);
129  f_len = m0_xcode_length(&h_xc);
130 
131  m0_rpc_item_onwire_header_size = h1_len + h2_len;
133 
134  return M0_RC(0);
135 }
136 
137 M0_INTERNAL void m0_rpc_item_module_fini(void)
138 {
139  struct m0_rpc_item_type *item_type;
140 
141  M0_ENTRY();
142 
144  m0_tl_for(rit, &rpc_item_types_list, item_type) {
145  rit_tlink_del_fini(item_type);
146  } m0_tl_endfor;
147  rit_tlist_fini(&rpc_item_types_list);
150 
151  M0_LEAVE();
152 }
153 
154 M0_INTERNAL void m0_rpc_item_type_register(struct m0_rpc_item_type *item_type)
155 {
156  uint64_t dir_flag;
157  M0_ENTRY("item_type: %p, item_opcode: %s (%u)", item_type,
158  m0_rpc_item_type_name(item_type), item_type->rit_opcode);
159  M0_PRE(item_type != NULL);
160  dir_flag = item_type->rit_flags & (M0_RPC_ITEM_TYPE_REQUEST |
162  M0_PRE(!opcode_is_dup(item_type->rit_opcode));
163  M0_PRE(m0_is_po2(dir_flag));
165  dir_flag == M0_RPC_ITEM_TYPE_REQUEST));
166 
170  rit_tlink_init_at(item_type, &rpc_item_types_list);
172 
173  M0_LEAVE();
174 }
175 
176 M0_INTERNAL void m0_rpc_item_type_deregister(struct m0_rpc_item_type *item_type)
177 {
178  M0_ENTRY("item_type: %p", item_type);
179  M0_PRE(item_type != NULL);
180 
182  rit_tlink_del_fini(item_type);
183  item_type->rit_magic = 0;
185 
186  M0_LEAVE();
187 }
188 
189 M0_INTERNAL struct m0_rpc_item_type *m0_rpc_item_type_lookup(uint32_t opcode)
190 {
191  struct m0_rpc_item_type *item_type;
192 
193  M0_ENTRY("opcode: %u", opcode);
194 
196  item_type = m0_tl_find(rit, item_type, &rpc_item_types_list,
197  item_type->rit_opcode == opcode);
199 
200  M0_POST(ergo(item_type != NULL, item_type->rit_opcode == opcode));
201  M0_LEAVE("item_type: %p", item_type);
202  return item_type;
203 }
204 
208  .sd_name = "UNINITIALISED",
209  .sd_allowed = 0,
210  },
212  .sd_flags = M0_SDF_INITIAL | M0_SDF_FINAL,
213  .sd_name = "INITIALISED",
214  .sd_allowed = M0_BITS(M0_RPC_ITEM_ENQUEUED,
219  },
221  .sd_name = "ENQUEUED",
222  .sd_allowed = M0_BITS(M0_RPC_ITEM_SENDING,
226  },
227  [M0_RPC_ITEM_URGENT] = {
228  .sd_name = "URGENT",
230  .sd_allowed = M0_BITS(M0_RPC_ITEM_SENDING,
233  },
234  [M0_RPC_ITEM_SENDING] = {
235  .sd_name = "SENDING",
237  },
238  [M0_RPC_ITEM_SENT] = {
239  .sd_flags = M0_SDF_FINAL,
240  .sd_name = "SENT",
242  M0_RPC_ITEM_ENQUEUED,/*only reply items*/
246  },
248  .sd_name = "WAITING_FOR_REPLY",
249  .sd_allowed = M0_BITS(M0_RPC_ITEM_REPLIED,
253  },
254  [M0_RPC_ITEM_REPLIED] = {
255  .sd_flags = M0_SDF_FINAL,
256  .sd_name = "REPLIED",
257  .sd_allowed = M0_BITS(M0_RPC_ITEM_UNINITIALISED,
261  },
262  [M0_RPC_ITEM_FAILED] = {
263  .sd_flags = M0_SDF_FINAL,
264  .sd_name = "FAILED",
265  .sd_allowed = M0_BITS(M0_RPC_ITEM_URGENT, /* resend */
267  },
268 };
269 
270 const struct m0_sm_conf outgoing_item_sm_conf = {
271  .scf_name = "Outgoing-RPC-Item-sm",
272  .scf_nr_states = ARRAY_SIZE(outgoing_item_states),
273  .scf_state = outgoing_item_states,
274 };
275 
279  .sd_name = "UNINITIALISED",
280  .sd_allowed = 0,
281  },
283  .sd_flags = M0_SDF_INITIAL | M0_SDF_FINAL,
284  .sd_name = "INITIALISED",
285  .sd_allowed = M0_BITS(M0_RPC_ITEM_ACCEPTED,
288  },
290  .sd_flags = M0_SDF_FINAL,
291  .sd_name = "ACCEPTED",
292  .sd_allowed = M0_BITS(M0_RPC_ITEM_REPLIED,
295  },
296  [M0_RPC_ITEM_REPLIED] = {
297  .sd_flags = M0_SDF_FINAL,
298  .sd_name = "REPLIED",
299  .sd_allowed = M0_BITS(M0_RPC_ITEM_UNINITIALISED),
300  },
301  [M0_RPC_ITEM_FAILED] = {
302  .sd_flags = M0_SDF_FINAL,
303  .sd_name = "FAILED",
304  .sd_allowed = M0_BITS(M0_RPC_ITEM_UNINITIALISED),
305  },
306 };
307 
308 const struct m0_sm_conf incoming_item_sm_conf = {
309  .scf_name = "Incoming-RPC-Item-sm",
310  .scf_nr_states = ARRAY_SIZE(incoming_item_states),
311  .scf_state = incoming_item_states,
312 };
313 
314 M0_INTERNAL bool m0_rpc_item_invariant(const struct m0_rpc_item *item)
315 {
316  int state;
317  bool req;
318  bool rply;
319  bool oneway;
320 
321  if (item == NULL || item->ri_type == NULL)
322  return false;
323 
324  state = item->ri_sm.sm_state;
326  rply = m0_rpc_item_is_reply(item);
327  oneway = m0_rpc_item_is_oneway(item);
328 
329  return item->ri_magic == M0_RPC_ITEM_MAGIC &&
332  (req + rply + oneway == 1) && /* only one of three is true */
333  equi(req || rply, item->ri_session != NULL) &&
334 
335  equi(state == M0_RPC_ITEM_FAILED, item->ri_error != 0) &&
336 
337  ergo(item->ri_reply != NULL,
338  req &&
339  M0_IN(state, (M0_RPC_ITEM_SENDING,
341  M0_RPC_ITEM_REPLIED))) &&
342 
343  equi(itemq_tlink_is_in(item), state == M0_RPC_ITEM_ENQUEUED) &&
344  equi(item->ri_itemq != NULL, state == M0_RPC_ITEM_ENQUEUED) &&
345 
346  equi(packet_item_tlink_is_in(item),
347  state == M0_RPC_ITEM_SENDING);
348 
349 }
350 
351 M0_INTERNAL const char *item_state_name(const struct m0_rpc_item *item)
352 {
354 }
355 
356 M0_INTERNAL const char *item_kind(const struct m0_rpc_item *item)
357 {
358  return item->ri_type == NULL ? "UNKIND" :
359  m0_rpc_item_is_request(item) ? "REQUEST" :
360  m0_rpc_item_is_reply(item) ? "REPLY" :
361  m0_rpc_item_is_oneway(item) ? "ONEWAY" : "INVALID_KIND";
362 }
363 
365  const struct m0_rpc_item_type *itype)
366 {
368  M0_PRE(item != NULL && itype != NULL);
369  M0_PRE(M0_IS0(item));
370 
371  item->ri_type = itype;
374 
376  item->ri_nr_sent_max = ~(uint64_t)0;
377 
378  item->ri_xid_assigned_here = false;
379 
380  packet_item_tlink_init(item);
381  itemq_tlink_init(item);
382  rpcitem_tlink_init(item);
383  rpcitem_tlist_init(&item->ri_compound_items);
384  ric_tlink_init(item);
385  pending_item_tlink_init(item);
386  xidl_tlink_init(item);
389  /* item->ri_sm will be initialised when the item is posted */
390  M0_LEAVE();
391 }
392 M0_EXPORTED(m0_rpc_item_init);
393 
395 {
397  /*
398  * Reset cookie so that a finalised item is not matched
399  * when a late reply arrives.
400  */
401  item->ri_cookid = 0;
402 
405 
408 
409  if (item->ri_reply != NULL) {
411  item->ri_reply = NULL;
412  }
413  if (itemq_tlink_is_in(item))
415 
417 
418  M0_ASSERT(!ric_tlink_is_in(item));
419  M0_ASSERT(!itemq_tlink_is_in(item));
420  M0_ASSERT(!packet_item_tlink_is_in(item));
421  M0_ASSERT(!rpcitem_tlink_is_in(item));
422  M0_ASSERT(!pending_item_tlink_is_in(item));
423  ric_tlink_fini(item);
424  itemq_tlink_fini(item);
425  packet_item_tlink_fini(item);
426  rpcitem_tlink_fini(item);
427  rpcitem_tlist_fini(&item->ri_compound_items);
428  pending_item_tlink_fini(item);
429  xidl_tlink_fini(item);
430  M0_LEAVE();
431 }
432 M0_EXPORTED(m0_rpc_item_fini);
433 
435 {
436  M0_PRE(item != NULL && item->ri_type != NULL &&
437  item->ri_type->rit_ops != NULL &&
439 
441 }
442 
444 {
445  M0_PRE(item != NULL && item->ri_type != NULL &&
446  item->ri_type->rit_ops != NULL &&
448  item->ri_rmachine != NULL);
450 
452 }
453 
455 {
456  struct m0_rpc_machine *rmach;
457 
459  /*
460  * must store rpc machine pointer on stack before rpc item is put, and
461  * has the pointer corrupted as the result
462  */
463  rmach = item->ri_rmachine;
464 
465  m0_rpc_machine_lock(rmach);
467  m0_rpc_machine_unlock(rmach);
468 }
469 
471 {
472  if (item->ri_size == 0)
476  M0_ASSERT(item->ri_size != 0);
477  return item->ri_size;
478 }
479 
481 {
482  M0_PRE(item->ri_type != NULL &&
483  item->ri_type->rit_ops != NULL &&
485 
487 }
488 
489 M0_INTERNAL
491  struct m0_rpc_session *session)
492 {
493  M0_PRE(item != NULL);
494  M0_PRE(session != NULL);
495 
496  if (M0_FI_ENABLED("payload_too_large1") ||
497  M0_FI_ENABLED("payload_too_large2"))
498  return true;
499 
500  return (m0_rpc_item_payload_size(item) >
502 }
503 
504 M0_INTERNAL bool m0_rpc_item_is_update(const struct m0_rpc_item *item)
505 {
506  return (item->ri_type->rit_flags & M0_RPC_ITEM_TYPE_MUTABO) != 0;
507 }
508 
509 M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
510 {
511  M0_PRE(item != NULL && item->ri_type != NULL);
512 
514 }
515 
516 M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
517 {
518  M0_PRE(item != NULL && item->ri_type != NULL);
519 
520  return (item->ri_type->rit_flags & M0_RPC_ITEM_TYPE_REPLY) != 0;
521 }
522 
523 M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
524 {
525  M0_PRE(item != NULL);
526  M0_PRE(item->ri_type != NULL);
527 
528  return (item->ri_type->rit_flags & M0_RPC_ITEM_TYPE_ONEWAY) != 0;
529 }
530 
531 static bool rpc_item_needs_xid(const struct m0_rpc_item *item)
532 {
533  return !m0_rpc_item_is_oneway(item) &&
534  !M0_IN(item->ri_type->rit_opcode,
541 }
542 
543 M0_INTERNAL void m0_rpc_item_xid_min_update(struct m0_rpc_item *item)
544 {
546 
547  M0_ENTRY("item="ITEM_FMT" osr_xid=%" PRIu64 " "
548  "osr_session_xid_min=%"PRIu64,
551  if (rpc_item_needs_xid(item)) {
553  M0_ASSERT(!(M0_IN(item->ri_header.osr_xid, (0, UINT64_MAX))));
555  xidl_tlist_is_empty(&item->ri_session->s_xid_list) ? 1 :
556  xidl_tlist_head(&item->ri_session->s_xid_list)->
557  ri_header.osr_xid;
558  }
559  M0_LEAVE("item="ITEM_FMT" osr_xid=%" PRIu64 " "
560  "osr_session_xid_min=%"PRIu64,
563 }
564 
565 M0_INTERNAL void m0_rpc_item_xid_assign(struct m0_rpc_item *item)
566 {
568 
569  M0_ENTRY("item: "ITEM_FMT" nr_sent=%d xid=%"PRIu64, ITEM_ARG(item),
571  /*
572  * xid needs to be assigned only once.
573  * At this point ri_nr_sent is already incremented.
574  *
575  * xid for reply is not changed. It is already set to the request xid.
576  */
577  if (item->ri_nr_sent == 1 && !m0_rpc_item_is_reply(item)) {
579  ++item->ri_session->s_xid :
580  UINT64_MAX;
581  if (rpc_item_needs_xid(item)) {
582  M0_LOG(M0_DEBUG, "xidl_tlist_add_tail(), item="ITEM_FMT,
583  ITEM_ARG(item));
584  xidl_tlist_add_tail(&item->ri_session->s_xid_list,
585  item);
586  item->ri_xid_assigned_here = true;
587  }
588  M0_LOG(M0_DEBUG, ITEM_FMT" set item xid=%" PRIu64 " "
589  "s_xid=%"PRIu64, ITEM_ARG(item), item->ri_header.osr_xid,
591  item->ri_session->s_xid);
592  }
593  M0_LEAVE();
594 }
595 
597 {
598  M0_ENTRY("item="ITEM_FMT" osr_xid=%" PRIu64 " "
599  "osr_session_xid_min=%" PRIu64 " ri_xid_assigned_here=%d",
603  if (item->ri_xid_assigned_here) {
605  s_conn->c_rpc_machine));
606  xidl_tlist_del(item);
607  }
608 }
609 
617 M0_INTERNAL bool m0_rpc_item_xid_check(struct m0_rpc_item *item,
618  struct m0_rpc_item **next)
619 {
620  struct m0_rpc_session *sess = item->ri_session;
621  uint64_t xid = item->ri_header.osr_xid;
622  struct m0_rpc_item *cached;
623 
624  /* If item doesn't need xid then xid doesn't need to be checked */
625  if (!rpc_item_needs_xid(item))
626  return true;
627 
628  M0_LOG(M0_DEBUG, "item: "ITEM_FMT" session=%p osr_xid=%" PRIu64 " "
629  "osr_session_xid_min=%" PRIu64 " s_xid=%"PRIu64,
631  sess->s_xid);
632  /*
633  * Purge cache on every N-th packet
634  * (not on every one - that could be pretty expensive).
635  */
636  if ((xid & 0xff) == 0)
638 
639  /*
640  * Check if an item had been cancelled on the other side.
641  * If it had, sess->s_xid has to be updated, otherwise this rpc session
642  * will wait indefinitely for the item, which is not going to come.
643  */
645  if (sess->s_xid < item->ri_header.osr_session_xid_min - 1) {
646  M0_LOG(M0_WARN,
647  "RPC items had been cancelled on the other side."
648  " Changing session %p xid from %" PRIu64 " to %" PRIu64 ".",
649  sess, sess->s_xid,
652  }
653  /*
654  * The new item which wasn't handled yet.
655  *
656  * XXX: Note, on a high loads, reply cache may grow huge.
657  * This can be optimized by implementing the sending
658  * of the last consequent received reply xid in every
659  * request from the client.
660  */
661  if (xid == sess->s_xid + 1) { /* Normal case. */
662  ++sess->s_xid;
663  if (next != NULL)
665  xid + 1);
666  return M0_RC(true);
667  } else if (m0_mod_gt(xid, sess->s_xid)) {
668  /* Out-of-order case. Cache it for the future. */
669  cached = m0_rpc_item_cache_lookup(&sess->s_req_cache, xid);
670  if (cached == NULL)
673  /*
674  * Misordered request without reply - drop it, now that it
675  * is present in the cache.
676  */
677  M0_LOG(M0_NOTICE, "item: "ITEM_FMT" session=%p misordered=%"
678  PRIu64" != %"PRIu64, ITEM_ARG(item), sess, xid,
679  sess->s_xid + 1);
680  return M0_RC(false);
681  }
682 
683  /* Resend cached reply if it is available for the request. */
684  cached = m0_rpc_item_cache_lookup(&sess->s_reply_cache, xid);
685  if (cached != NULL) {
686  M0_LOG(M0_DEBUG, "cached_reply=%p xid=%" PRIu64 " state=%d",
687  cached, xid, cached->ri_sm.sm_state);
688  if (M0_IN(cached->ri_sm.sm_state, (M0_RPC_ITEM_SENT,
689  M0_RPC_ITEM_FAILED))) {
692  if (cached->ri_error == -ENETDOWN)
693  cached->ri_error = 0;
694  m0_rpc_item_send_reply(item, cached);
695  }
696  return M0_RC(false);
697  } else
698  M0_LOG(M0_DEBUG, "item: "ITEM_FMT" xid=%" PRIu64 " s_xid=%"PRIu64
699  " No reply found", ITEM_ARG(item), xid, sess->s_xid);
700 
701  return M0_RC(false);
702 }
703 
704 M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item,
705  enum m0_rpc_item_dir dir)
706 {
707  const struct m0_sm_conf *conf;
708 
709  M0_PRE(item != NULL && item->ri_rmachine != NULL);
710 
713 
714  M0_LOG(M0_DEBUG, "%p UNINITIALISED -> INITIALISED", item);
718 }
719 
720 M0_INTERNAL void m0_rpc_item_sm_fini(struct m0_rpc_item *item)
721 {
722  M0_PRE(item != NULL);
723 
724  m0_sm_fini(&item->ri_sm);
726 }
727 
728 M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item,
729  enum m0_rpc_item_state state)
730 {
731  M0_PRE(item != NULL);
733 
734  M0_LOG(M0_DEBUG, ITEM_FMT" xid=%"PRIu64 " %s -> %s sm=%s",
738 
739  m0_sm_state_set(&item->ri_sm, state);
740 }
741 
742 M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
743 {
744  struct m0_rpc_session *session = NULL;
745  struct m0_rpc_conn *conn = NULL;
746  struct m0_rpc_machine *rpc_mc = NULL;
747 
748  M0_PRE(item != NULL && rc != 0);
751 
752  M0_ENTRY("FAILED "ITEM_FMT" xid=%" PRIu64 " state=%s session=%p error=%d",
755 
756  if (item->ri_session != NULL) {
758  if (session->s_conn != NULL) {
759  conn = session->s_conn;
760  if (conn != NULL) {
761  rpc_mc = conn->c_rpc_machine;
762  }
763  }
764  }
765  if (false) { /* XXX disabled temporarily */
768  "RPC_ITEM_FAILED "ITEM_FMT" xid=%" PRIu64 " "
769  "state=\"%s\" rpc_machine_ep=%s "
770  "remote_machine_ep=%s error=%d", ITEM_ARG(item),
772  (rpc_mc == NULL ) ? "NOT_AVAILABLE" :
773  m0_rpc_machine_ep(rpc_mc),
774  (session == NULL ) ? "NOT_AVAILABLE" :
776  }
777 
779 
781  M0_IN(item->ri_sm.sm_state,
786 
787  /*
788  * Request and Reply items take hold on session until
789  * they are SENT/FAILED.
790  * See: m0_rpc__post_locked(), m0_rpc_reply_post()
791  * m0_rpc_item_send()
792  */
798 
799  item->ri_error = rc;
802  /* XXX ->rio_sent() can be called multiple times (due to cancel). */
805  /*
806  * Reference release done here is for the reference taken
807  * while submitting item to formation, using m0_rpc_frm_enq_item().
808  */
810  M0_LEAVE();
811 }
812 
814  uint64_t states, m0_time_t timeout)
815 {
816  int rc;
817 
821  return M0_RC(rc);
822 }
823 
825 {
826  int rc;
827 
829 
833  timeout);
834  if (rc == 0 && item->ri_sm.sm_state == M0_RPC_ITEM_FAILED)
835  rc = item->ri_error;
836 
838  return M0_RC(rc);
839 }
840 
841 static void item_cancel_fi(struct m0_rpc_item *item)
842 {
843  uint32_t sm_state = item->ri_sm.sm_state;
844  uint64_t ref = m0_ref_read(&(m0_rpc_item_to_fop(item)->f_ref));
845 
846  if (M0_FI_ENABLED("cancel_enqueued_item")) {
848  M0_ASSERT(pending_item_tlink_is_in(item));
849  M0_ASSERT(ref == 5);
850  } else if (M0_FI_ENABLED("cancel_sending_item")) {
852  M0_ASSERT(pending_item_tlink_is_in(item));
853  M0_ASSERT(ref == 5);
854  } else if (M0_FI_ENABLED("cancel_waiting_for_reply_item")) {
856  M0_ASSERT(pending_item_tlink_is_in(item));
857  M0_ASSERT(ref == 3);
858  } else if (M0_FI_ENABLED("cancel_replied_item")) {
862  M0_ASSERT(!pending_item_tlink_is_in(item));
863  M0_ASSERT(ref == 1);
864  }
865 }
866 
868 {
869  struct m0_rpc_session *session;
870 
871  M0_PRE(item != NULL);
872  M0_PRE(item->ri_session != NULL);
876 
878 
879  M0_ENTRY("item "ITEM_FMT" session=%p", ITEM_ARG(item), session);
880 
882 
883  /*
884  * Note: The item which is requested to be cancelled, may have been
885  * released by the time the API was invoked. Hence, check if the item
886  * is still part of the pending_cache.
887  */
888  if (!pending_item_tlink_is_in(item)) {
889  M0_LOG(M0_DEBUG, "Item %p, not present in the pending item "
890  "cache", item);
891  return;
892  }
893 
894  M0_LOG(M0_DEBUG, ITEM_FMT" item->ri_sm.sm_state=%d ri_error=%d "
895  "ref=%llu", ITEM_ARG(item), item->ri_sm.sm_state, item->ri_error,
896  (unsigned long long)m0_ref_read(
897  &(m0_rpc_item_to_fop(item)->f_ref)));
898 
902  /*
903  * Release the reference taken either in m0_rpc_item_send() or
904  * in m0_rpc_oneway_item_post_locked().
905  */
907  }
908 
912 
913  if (packet_item_tlink_is_in(item)) {
917  }
918 
919  M0_ASSERT(m0_ref_read(&(m0_rpc_item_to_fop(item)->f_ref)) >= 2);
920  m0_rpc_item_failed(item, -ECANCELED);
921 
922  M0_POST(!itemq_tlink_is_in(item));
923  M0_POST(!packet_item_tlink_is_in(item));
924  M0_POST(!rpcitem_tlink_is_in(item));
925  M0_POST(!pending_item_tlink_is_in(item));
926  M0_LOG(M0_DEBUG, ITEM_FMT" session=%p item cancelled ref=%llu",
928  (unsigned long long)m0_ref_read(
929  &(m0_rpc_item_to_fop(item)->f_ref)));
930 }
931 
933 {
934  struct m0_rpc_machine *mach;
935 
936  M0_PRE(item != NULL);
937  M0_PRE(item->ri_session != NULL);
938 
939  M0_ENTRY("item %p, session %p", item, item->ri_session);
944  M0_LEAVE("item %p, session %p", item, item->ri_session);
945 }
946 
948 {
949  struct m0_rpc_machine *mach;
950  const struct m0_rpc_item_type *ri_type;
951 
952  M0_PRE(item != NULL);
953  M0_PRE(item->ri_session != NULL);
954 
955  M0_ENTRY("item %p, session %p", item, item->ri_session);
959  /* Re-initialise item. User may want to re-post it. */
960  ri_type = item->ri_type;
961  item->ri_error = 0;
963  M0_SET0(item);
964  m0_rpc_item_init(item, ri_type);
966  M0_LOG(M0_DEBUG, ITEM_FMT" item re-initialised ref=%llu",
967  ITEM_ARG(item),
968  (unsigned long long)m0_ref_read(
969  &(m0_rpc_item_to_fop(item)->f_ref)));
970  M0_LEAVE();
971 }
972 
973 int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
974 {
975  return item->ri_error ?: (item->ri_reply == NULL ? 0 :
977 }
978 M0_EXPORTED(m0_rpc_item_error);
979 
981 {
982  struct m0_rpc_item *item = M0_AMB(item, mach, ri_sm);
983  struct m0_rpc_frm *frm = item->ri_frm;
984 
986  M0_LOG(M0_DEBUG, ""ITEM_FMT" ENQUEUED -> URGENT",
987  ITEM_ARG(item));
989  /*
990  * m0_rpc_frm_item_deadline_passed() might reenter in
991  * m0_sm_state_set() and modify item state.
992  * So at this point the item may or may not be in URGENT state.
993  */
994  }
995  return -1;
996 }
997 
998 M0_INTERNAL int m0_rpc_item_timer_start(struct m0_rpc_item *item)
999 {
1001 
1002  if (M0_FI_ENABLED("failed")) {
1003  M0_LOG(M0_DEBUG, "item %p failed to start timer", item);
1004  return M0_ERR(-EINVAL);
1005  }
1007  return 0;
1008 
1009  M0_LOG(M0_DEBUG, "item %p Starting timer", item);
1013  item_timer_cb,
1016 }
1017 
1018 M0_INTERNAL void m0_rpc_item_timer_stop(struct m0_rpc_item *item)
1019 {
1022  M0_LOG(M0_DEBUG, "%p Stopping timer", item);
1024  }
1025 }
1026 
1027 static void item_timer_cb(struct m0_sm_timer *timer)
1028 {
1029  struct m0_rpc_item *item;
1030 
1031  M0_ENTRY();
1032  M0_PRE(timer != NULL);
1033 
1034  item = container_of(timer, struct m0_rpc_item, ri_timer);
1037 
1038  M0_LOG(M0_DEBUG, ""ITEM_FMT" %s Timer elapsed.",
1040 
1043  else
1044  item_resend(item);
1045 }
1046 
1047 static void item_timedout(struct m0_rpc_item *item)
1048 {
1049  struct m0_rpc_conn *conn = item->ri_session->s_conn;
1050 
1051  M0_LOG(M0_DEBUG, ITEM_FMT" %s TIMEDOUT.", ITEM_ARG(item),
1054 
1056  switch (item->ri_sm.sm_state) {
1057  case M0_RPC_ITEM_ENQUEUED:
1058  case M0_RPC_ITEM_URGENT:
1060  m0_rpc_item_failed(item, -ETIMEDOUT);
1062  break;
1063 
1064  case M0_RPC_ITEM_SENDING:
1065  item->ri_error = -ETIMEDOUT;
1066  /* item will be moved to FAILED state in item_done() */
1067  break;
1068 
1070  m0_rpc_item_failed(item, -ETIMEDOUT);
1071  break;
1072 
1073  default:
1074  M0_ASSERT(false);
1075  }
1076  M0_LEAVE();
1077 }
1078 
1079 static void item_resend(struct m0_rpc_item *item)
1080 {
1081  int rc;
1082 
1084 
1086  switch (item->ri_sm.sm_state) {
1087  case M0_RPC_ITEM_ENQUEUED:
1088  case M0_RPC_ITEM_URGENT:
1090  /* XXX already completed requests??? */
1091  if (rc != 0) {
1093  m0_rpc_item_failed(item, -ETIMEDOUT);
1094  } else {
1096  item2conn(item));
1097  }
1098  break;
1099 
1100  case M0_RPC_ITEM_SENDING:
1103  break;
1104 
1107  M0_LOG(M0_DEBUG,
1108  "session state %d does not allow sending",
1110  return;
1111  }
1113  break;
1114 
1115  default:
1116  M0_ASSERT_INFO(false, "item->ri_sm.sm_state = %d",
1117  item->ri_sm.sm_state);
1118  }
1119  M0_LEAVE();
1120 }
1121 
1122 static int item_conn_test(struct m0_rpc_item *item)
1123 {
1124  M0_PRE(item != NULL && item->ri_session != NULL);
1126  -ECANCELED : 0;
1127 }
1128 
1129 M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
1130 {
1131  uint32_t state = item->ri_sm.sm_state;
1132  int rc;
1133 
1134  M0_ENTRY(ITEM_FMT" dest_ep=%s ri_session=%p ri_nr_sent_max=%"PRIu64
1135  " ri_deadline=%" PRIu64 " ri_nr_sent=%u", ITEM_ARG(item),
1138  item->ri_nr_sent);
1142  M0_IN(state, (M0_RPC_ITEM_INITIALISED,
1145  M0_RPC_ITEM_FAILED))) &&
1147  M0_IN(state, (M0_RPC_ITEM_INITIALISED,
1149  M0_RPC_ITEM_FAILED))));
1150 
1154  if (rc != 0) {
1156  M0_LEAVE();
1157  return;
1158  }
1159  }
1160 
1161  item->ri_nr_sent++;
1162 
1165 
1166  if (M0_FI_ENABLED("advance_deadline")) {
1167  M0_LOG(M0_DEBUG,"%p deadline advanced", item);
1168  item->ri_deadline = m0_time_from_now(0, 500 * 1000 * 1000);
1169  }
1170 
1171  /*
1172  * This hold will be released when the item is SENT or FAILED.
1173  * See rpc/frmops.c:item_sent() and m0_rpc_item_failed()
1174  */
1175 
1178  /*
1179  * Rpc always acquires an *internal* reference to "all" items.
1180  * This reference is released in item_sent()
1181  */
1182  if (item->ri_error == 0)
1184  M0_LEAVE();
1185 }
1186 
1187 M0_INTERNAL const char *
1189 {
1190  M0_PRE(item != NULL && item->ri_session != NULL);
1191 
1192  return m0_rpc_conn_addr(item2conn(item));
1193 }
1194 
1195 M0_INTERNAL const char *m0_rpc_item_opname(const struct m0_rpc_item *item)
1196 {
1198  "UNTYPE";
1199 }
1200 
1201 M0_INTERNAL const char *
1202 m0_rpc_item_type_name(const struct m0_rpc_item_type *item_type)
1203 {
1204  return m0_xcode_enum_print(&m0_xc_M0_RPC_OPCODES_enum,
1205  item_type->rit_opcode, NULL);
1206 }
1207 
1208 M0_INTERNAL int m0_rpc_item_received(struct m0_rpc_item *item,
1209  struct m0_rpc_machine *machine)
1210 {
1211  struct m0_rpc_item *req;
1212  struct m0_rpc_conn *conn;
1213  struct m0_rpc_session *sess;
1214  struct m0_rpc_item *next;
1215  uint64_t item_sm_id;
1216  int rc = 0;
1217 
1218  M0_PRE(item != NULL);
1220 
1221  M0_ENTRY(ITEM_FMT" xid=%llu machine=%p", ITEM_ARG(item),
1222  (unsigned long long)item->ri_header.osr_xid, machine);
1223 
1224  item_sm_id = m0_sm_id_get(&item->ri_sm);
1226  item_sm_id,
1227  (uint64_t)item->ri_type->rit_opcode,
1230 
1232 
1233  if (m0_rpc_item_is_oneway(item)) {
1235  return M0_RC(0);
1236  }
1237 
1239 
1242  return M0_RC(0);
1243  }
1244 
1246  if (conn == NULL)
1247  return M0_RC(-ENOENT);
1249  if (sess == NULL)
1250  return M0_RC(-ENOENT);
1251  item->ri_session = sess;
1252 
1253  /*
1254  * If item is a request, then it may be the first arrival of the
1255  * request item, or the same request may be sent again if resend
1256  * interval passed. In either case item shouldn't be handled again
1257  * and reply should be sent again if the item is already handled.
1258  *
1259  * To prevent second handling of the same item, xid is assigned to each
1260  * eligible rpc item (see rpc_item_needs_xid()). It is checked on the
1261  * other side (in this function). Session on the client and on the
1262  * server has xid counter, and items with wrong xid are dropped after
1263  * adding those to the item cache if not present there already.
1264  * In case if there is a reply in the reply cache, the reply is sent
1265  * again.
1266  *
1267  * Note that there is no duplicate or out-of-order detection for oneway
1268  * or connection establish rpc items.
1269  *
1270  * xid-based duplicate and out-of-order checks are only temporary
1271  * solutions until DTM is implemented.
1272  */
1274  for (next = NULL;
1276  item = next) {
1281  if (rc != 0) {
1282  /*
1283  * In case of ESHUTDOWN, when we are not fully
1284  * started yet, we just drop the request items
1285  * without any reply. So we should rollback our
1286  * s_xid counter to avoid desynchronization.
1287  * Otherwise, the sender would just get stuck on
1288  * re-sending its requests (with the same xid)
1289  * as we would just drop them as a stale ones
1290  * (even after we would become ready to serve
1291  * them already).
1292  */
1293  if (rc == -ESHUTDOWN)
1294  --sess->s_xid;
1295  m0_rpc_session_release(sess);
1296  break;
1297  }
1298  }
1299  } else {
1301  }
1302 
1303  return M0_RC(rc);
1304 }
1305 
1307  struct m0_rpc_item **req_out)
1308 {
1309  struct m0_rpc_item *req;
1310  int rc;
1311 
1312  M0_PRE(reply != NULL && req_out != NULL);
1313  M0_ENTRY("item_reply: "ITEM_FMT, ITEM_ARG(reply));
1314 
1315  *req_out = NULL;
1316 
1317  req = m0_cookie_of(&reply->ri_header.osr_cookie,
1318  struct m0_rpc_item, ri_cookid);
1319  if (req == NULL) {
1320  /*
1321  * Either it is a duplicate reply and its corresponding request
1322  * item is pruned from the item list, or it is a corrupted
1323  * reply, or it is meant for a request that was cancelled.
1324  */
1325  M0_LOG(M0_DEBUG, "request not found for reply item "ITEM_FMT,
1326  ITEM_ARG(reply));
1327  return M0_RC(-EPROTO);
1328  }
1329  M0_LOG(M0_DEBUG, "req "ITEM_FMT", reply "ITEM_FMT,
1330  ITEM_ARG(req), ITEM_ARG(reply));
1331 
1333  return M0_RC(-EREMOTE);
1334 
1335  rc = req_replied(req, reply);
1336  if (rc == 0)
1337  *req_out = req;
1338 
1339  return M0_RC(rc);
1340 }
1341 
1343  struct m0_rpc_item *reply)
1344 {
1345  if (M0_FI_ENABLED("drop_create_item_reply") &&
1346  req->ri_type->rit_opcode == M0_IOSERVICE_COB_CREATE_OPCODE) {
1347  M0_LOG(M0_DEBUG, ITEM_FMT" create reply dropped",
1348  ITEM_ARG(reply));
1349  return true;
1350  }
1351  if (M0_FI_ENABLED("drop_delete_item_reply") &&
1352  req->ri_type->rit_opcode == M0_IOSERVICE_COB_DELETE_OPCODE) {
1353  M0_LOG(M0_DEBUG, ITEM_FMT" delete reply dropped",
1354  ITEM_ARG(reply));
1355  return true;
1356  }
1357  return false;
1358 }
1359 
1360 static int req_replied(struct m0_rpc_item *req, struct m0_rpc_item *reply)
1361 {
1362  int rc = 0;
1363 
1364  M0_PRE(req != NULL && reply != NULL);
1365 
1366  if (req->ri_error == -ETIMEDOUT) {
1367  /*
1368  * The reply is valid but too late. Do nothing.
1369  */
1370  M0_LOG(M0_DEBUG, "rply rcvd, timedout req "ITEM_FMT,
1371  ITEM_ARG(req));
1372  rc = -EPROTO;
1373  } else if (req->ri_error != 0) {
1374  M0_LOG(M0_NOTICE, "rply rcvd, req "ITEM_FMT" error=%d",
1375  ITEM_ARG(req), req->ri_error);
1376  rc = -EPROTO;
1377  } else {
1378  /*
1379  * This is valid reply case.
1380  */
1382 
1383  switch (req->ri_sm.sm_state) {
1384  case M0_RPC_ITEM_ENQUEUED:
1385  case M0_RPC_ITEM_URGENT:
1386  /*
1387  * Reply received while we were trying to resend.
1388  */
1390  &item2conn(req)->c_rpcchan->rc_frm,
1391  req);
1393  m0_rpc_session_release(req->ri_session);
1394  /*
1395  * We took get(req) at m0_rpc_item_send() and but
1396  * the correspondent put() at item_sent() won't be
1397  * called already, so we have to put() it here.
1398  */
1400  break;
1401 
1402  case M0_RPC_ITEM_SENDING:
1403  /*
1404  * Buffer sent callback is still pending;
1405  * postpone reply processing.
1406  * item_sent() will process the reply.
1407  */
1408  M0_LOG(M0_DEBUG, "req: %p rply: %p rply postponed",
1409  req, reply);
1410  req->ri_pending_reply = reply;
1411  break;
1412 
1413  case M0_RPC_ITEM_ACCEPTED:
1416  break;
1417 
1418  case M0_RPC_ITEM_REPLIED:
1419  /* Duplicate reply. Drop it. */
1420  req->ri_rmachine->rm_stats.rs_nr_dropped_items++;
1422  break;
1423 
1424  default:
1425  M0_ASSERT(false);
1426  }
1427  }
1428  return M0_RC(rc);
1429 }
1430 
1436 {
1437  struct m0_rpc_conn *conn;
1438  struct m0_conf_obj *svc_obj;
1439 
1441  conn = item2conn(item);
1442  svc_obj = m0_rpc_conn2svc(conn);
1443  if (svc_obj == NULL)
1444  goto leave; /*
1445  * connection is not subscribed to HA notes, so no
1446  * reaction is expected on reply
1447  */
1448  if (svc_obj->co_ha_state == M0_NC_TRANSIENT) {
1451  }
1452 leave:
1453  M0_LEAVE();
1454 }
1455 
1456 static void addb2_add_rpc_attrs(const struct m0_rpc_item *req)
1457 {
1459  M0_AVI_RPC_ATTR_OPCODE, req->ri_type->rit_opcode);
1461  M0_AVI_RPC_ATTR_NR_SENT, req->ri_nr_sent);
1462 }
1463 
1464 M0_INTERNAL void m0_rpc_item_process_reply(struct m0_rpc_item *req,
1465  struct m0_rpc_item *reply)
1466 {
1468 
1469  M0_PRE(req != NULL && reply != NULL);
1471  M0_PRE(M0_IN(req->ri_sm.sm_state, (M0_RPC_ITEM_WAITING_FOR_REPLY,
1473  M0_RPC_ITEM_URGENT)));
1476  req->ri_reply = reply;
1481 
1482  /*
1483  * Only attributes of reasonable RPC items are logged
1484  * due to performance aspect.
1485  */
1486  if (M0_IN(req->ri_type->rit_opcode, (M0_IOSERVICE_READV_OPCODE,
1495  /*
1496  * Reference release done here is for the reference taken in
1497  * m0_rpc__post_locked() for request items.
1498  */
1500  M0_LEAVE();
1501 }
1502 
1503 M0_INTERNAL void m0_rpc_item_send_reply(struct m0_rpc_item *req,
1504  struct m0_rpc_item *reply)
1505 {
1506  struct m0_rpc_session *sess;
1507 
1508  M0_PRE(req != NULL && reply != NULL);
1510  M0_PRE(M0_IN(req->ri_sm.sm_state, (M0_RPC_ITEM_ACCEPTED)));
1511  M0_PRE(M0_IN(req->ri_reply, (NULL, reply)));
1512 
1513  M0_ENTRY("req="ITEM_FMT" reply="ITEM_FMT,
1514  ITEM_ARG(req), ITEM_ARG(reply));
1515 
1516  if (req->ri_reply == NULL) {
1518  req->ri_reply = reply;
1519  }
1521 
1522  sess = req->ri_session;
1523  reply->ri_header = req->ri_header;
1524  if (rpc_item_needs_xid(req))
1528  m0_rpc_session_release(sess);
1529 
1530  /*
1531  * An extra reference is acquired for this reply item
1532  * at the end of m0_rpc_item_send() if it is sent successfully.
1533  * This extra reference will be released along with request
1534  * put in m0_fom_fini() -> m0_fop_put() -> m0_ref_put() ->
1535  * m0_fop_release() -> m0_fop_fini() -> m0_rpc_item_put() if
1536  * req->ri_reply is set. If the reply item is failed to send,
1537  * no extra reference is acquired for that reply item in
1538  * m0_rpc_item_send(). So this req->ri_reply must be cleared
1539  * in this error case.
1540  */
1541  if (reply->ri_error != 0)
1542  req->ri_reply = NULL;
1543 
1544  M0_LEAVE();
1545 }
1546 
1547 M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic,
1548  struct m0_mutex *lock)
1549 {
1550  int i;
1551 
1553  if (ic->ric_items == NULL)
1554  return M0_ERR(-ENOMEM);
1555 
1556  for (i = 0; i < RIC_HASH_SIZE; ++i)
1557  ric_tlist_init(&ic->ric_items[i]);
1558 
1559  ic->ric_lock = lock;
1560 
1562 
1563  return 0;
1564 }
1565 
1566 M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
1567 {
1568  int i;
1569 
1571 
1573  for (i = 0; i < RIC_HASH_SIZE; ++i)
1574  ric_tlist_fini(&ic->ric_items[i]);
1575  m0_free(ic->ric_items);
1576 }
1577 
1579 {
1580  return m0_mutex_is_locked(ic->ric_lock);
1581 }
1582 
1583 M0_INTERNAL bool m0_rpc_item_cache_add(struct m0_rpc_item_cache *ic,
1584  struct m0_rpc_item *item,
1585  m0_time_t deadline)
1586 {
1587  struct m0_rpc_item *cached;
1588 
1589  M0_ENTRY("item: "ITEM_FMT" xid=%"PRIu64, ITEM_ARG(item),
1592 
1594  if (cached == NULL) {
1596  ric_tlink_init_at(item, &ic->ric_items[item->ri_header.osr_xid &
1597  RIC_HASH_MASK]);
1598  }
1599  M0_ASSERT_INFO(M0_IN(cached, (NULL, item)), "duplicate xid=%"PRIu64
1600  " item=%p", item->ri_header.osr_xid, item);
1601  item->ri_cache_deadline = deadline;
1602 
1603  return M0_RC(cached == NULL);
1604 }
1605 
1606 static void rpc_item_cache_del(struct m0_rpc_item_cache *ic,
1607  struct m0_rpc_item *item)
1608 {
1609  M0_ENTRY("item: "ITEM_FMT" xid=%"PRIu64, ITEM_ARG(item),
1611  ric_tlink_del_fini(item);
1613  M0_LEAVE();
1614 }
1615 
1616 M0_INTERNAL void m0_rpc_item_cache_del(struct m0_rpc_item_cache *ic,
1617  uint64_t xid)
1618 {
1619  struct m0_rpc_item *cached;
1620 
1621  M0_ENTRY("xid=%"PRIu64, xid);
1623 
1624  cached = m0_rpc_item_cache_lookup(ic, xid);
1625  if (cached != NULL)
1626  rpc_item_cache_del(ic, cached);
1627  M0_LEAVE();
1628 }
1629 
1630 M0_INTERNAL struct m0_rpc_item *
1632 {
1634 
1635  return m0_tl_find(ric, item, &ic->ric_items[xid & RIC_HASH_MASK],
1636  item->ri_header.osr_xid == xid);
1637 }
1638 
1639 M0_INTERNAL void m0_rpc_item_cache_purge(struct m0_rpc_item_cache *ic)
1640 {
1641  int i;
1642  struct m0_rpc_item *item;
1643  m0_time_t now = m0_time_now();
1644 
1646 
1647  for (i = 0; i < RIC_HASH_SIZE; ++i) {
1648  m0_tl_for(ric, &ic->ric_items[i], item) {
1649  if (now > item->ri_cache_deadline)
1650  rpc_item_cache_del(ic, item);
1651  } m0_tl_endfor;
1652  }
1653 }
1654 
1655 M0_INTERNAL void m0_rpc_item_cache_clear(struct m0_rpc_item_cache *ic)
1656 {
1657  int i;
1658  struct m0_rpc_item *item;
1659 
1661 
1662  for (i = 0; i < RIC_HASH_SIZE; ++i) {
1663  m0_tl_for(ric, &ic->ric_items[i], item) {
1664  rpc_item_cache_del(ic, item);
1665  } m0_tl_endfor;
1666  }
1667 }
1668 
1670 {
1672  xidl_tlist_init(&session->s_xid_list);
1673 }
1674 
1676 {
1677  struct m0_rpc_item *item;
1678 
1680  m0_tl_for(xidl, &session->s_xid_list, item) {
1681  M0_LOG(M0_ERROR, "session=%p item="ITEM_FMT" ri_sm.sm_state=%d",
1683  } m0_tl_endfor;
1684  xidl_tlist_fini(&session->s_xid_list);
1685 }
1686 
1688 {
1690  pending_item_tlist_init(&session->s_pending_cache);
1691 }
1692 
1694 {
1695  struct m0_rpc_item *item;
1696 
1698  m0_tl_teardown(pending_item, &session->s_pending_cache, item) {
1700  };
1701 }
1702 
1704 {
1707  pending_item_tlist_fini(&session->s_pending_cache);
1708 }
1709 
1711 {
1713  return;
1714 
1715  M0_ENTRY(ITEM_FMT" xid=%" PRIu64 " nr_sent=%lu item->ri_reply=%p",
1717  (unsigned long)item->ri_nr_sent, item->ri_reply);
1718 
1721 
1722  if (item->ri_nr_sent > 1 && item->ri_reply == NULL) {
1723  M0_ASSERT(pending_item_tlink_is_in(item));
1724  M0_LEAVE("%p", item);
1725  return;
1726  }
1727 
1728  M0_ASSERT(!pending_item_tlink_is_in(item));
1729  /*
1730  * The item is sent for the first time OR
1731  * it has been resent after it has received reply e.g. during recovery.
1732  */
1733  M0_ASSERT((item->ri_nr_sent == 1 && item->ri_reply == NULL) ||
1734  (item->ri_nr_sent > 1 && item->ri_reply != NULL));
1736  pending_item_tlink_init_at(item, &item->ri_session->s_pending_cache);
1737  M0_POST(pending_item_tlink_is_in(item));
1738  M0_LEAVE("%p Added to pending cache", item);
1739 }
1740 
1742 {
1744  return;
1745 
1749  M0_PRE(pending_item_tlink_is_in(item));
1750 
1751  pending_item_tlink_del_fini(item);
1753  M0_LEAVE("%p Removed from pending cache", item);
1754 }
1755 
1756 M0_INTERNAL void m0_rpc_item_replied_invoke(struct m0_rpc_item *req)
1757 {
1758  if (req->ri_ops != NULL && req->ri_ops->rio_replied != NULL) {
1760  (uint64_t)req, req->ri_type->rit_opcode);
1761  req->ri_ops->rio_replied(req);
1763  }
1764 }
1765 
1766 M0_INTERNAL void m0_rpc_item_sent_invoke(struct m0_rpc_item *item)
1767 {
1768  if (item->ri_ops != NULL && item->ri_ops->rio_sent != NULL)
1769  item->ri_ops->rio_sent(item);
1770 }
1771 
1772 #undef M0_TRACE_SUBSYSTEM
1773 
1775 /*
1776  * Local variables:
1777  * c-indentation-style: "K&R"
1778  * c-basic-offset: 8
1779  * tab-width: 8
1780  * fill-column: 80
1781  * scroll-step: 1
1782  * End:
1783  */
static void item_resend(struct m0_rpc_item *item)
Definition: item.c:1079
static void sm_state(const struct m0_sm_conf *conf, struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:541
M0_INTERNAL void m0_rpc_item_xid_list_fini(struct m0_rpc_session *session)
Definition: item.c:1675
struct m0_tl * ric_items
Definition: item.h:550
static struct m0_mutex lock
Definition: transmit.c:326
uint32_t rit_opcode
Definition: item.h:474
m0_time_t ri_resend_interval
Definition: item.h:144
M0_INTERNAL bool m0_rpc_conn_is_known_dead(const struct m0_rpc_conn *conn)
Definition: conn.c:1311
#define M0_PRE(cond)
static struct m0_be_active_record_domain dummy
Definition: active_record.c:35
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
const struct m0_sm_conf incoming_item_sm_conf
Definition: item.c:60
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:728
M0_INTERNAL void m0_rpc_item_xid_list_init(struct m0_rpc_session *session)
Definition: item.c:1669
#define NULL
Definition: misc.h:38
#define ergo(a, b)
Definition: misc.h:293
struct m0_sm_timer ri_timer
Definition: item.h:176
M0_INTERNAL int m0_rpc_item_timer_start(struct m0_rpc_item *item)
Definition: item.c:998
Definition: sm.h:350
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
struct m0_tl s_xid_list
Definition: session.h:339
size_t ri_size
Definition: item.h:198
const struct m0_rpc_item_type_ops * rit_ops
Definition: item.h:476
static int item_conn_test(struct m0_rpc_item *item)
Definition: item.c:1122
static struct io_request req
Definition: file.c:100
M0_INTERNAL struct m0_rpc_conn * m0_rpc_machine_find_conn(const struct m0_rpc_machine *machine, const struct m0_rpc_item *item)
Definition: rpc_machine.c:836
M0_INTERNAL void m0_sm_timeout_init(struct m0_sm_timeout *to)
Definition: sm.c:667
M0_INTERNAL void m0_rpc_item_replied_invoke(struct m0_rpc_item *req)
Definition: item.c:1756
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
M0_INTERNAL int m0_rpc_item_module_init(void)
Definition: item.c:111
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL const uint64_t M0_HA_EPOCH_NONE
Definition: epoch.c:77
uint64_t rs_nr_resend_attempts
Definition: rpc_machine.h:63
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
Definition: item.c:742
uint64_t osr_session_id
Definition: onwire.h:97
static bool opcode_is_dup(uint32_t opcode)
Definition: item.c:97
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
struct m0_mutex * ric_lock
Definition: item.h:552
#define M0_ADDB2_PUSH(id,...)
Definition: addb2.h:261
static bool m0_is_po2(uint64_t val)
Definition: arith.h:153
struct m0_tl ri_compound_items
Definition: item.h:204
static void leave(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
Definition: locality.c:315
struct m0_sm ri_sm
Definition: item.h:181
M0_INTERNAL void m0_rwlock_write_lock(struct m0_rwlock *lock)
Definition: rwlock.c:42
M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
Definition: item.c:1566
int32_t ri_error
Definition: item.h:161
static void rpc_item_xid_unassign(struct m0_rpc_item *item)
Definition: item.c:596
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
Definition: sm.c:781
m0_time_t ri_cache_deadline
Definition: item.h:240
Definition: conf.py:1
M0_INTERNAL struct m0_rpc_item_type * m0_rpc_item_type_lookup(uint32_t opcode)
Definition: item.c:189
static struct m0_addb2_mach * mach
Definition: storage.c:42
struct m0_sm_conf rit_incoming_conf
Definition: item.h:483
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
Definition: item.c:356
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL void m0_rpc_item_type_register(struct m0_rpc_item_type *item_type)
Definition: item.c:154
struct m0_sm_timeout ri_deadline_timeout
Definition: item.h:169
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
Definition: sm.c:891
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
void m0_rpc_item_init(struct m0_rpc_item *item, const struct m0_rpc_item_type *itype)
Definition: item.c:364
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL void m0_rpc_item_sm_fini(struct m0_rpc_item *item)
Definition: item.c:720
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
Definition: item.c:824
struct m0_rpc_chan * c_rpcchan
Definition: conn.h:317
static int item_entered_in_urgent_state(struct m0_sm *mach)
Definition: item.c:980
static void item__on_reply_postprocess(struct m0_rpc_item *item)
Definition: item.c:1435
#define ITEM_ARG(item)
Definition: item.h:618
m0_rpc_item_state
Definition: item.h:58
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_footer_size
Definition: item.c:105
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
M0_INTERNAL void m0_rpc_item_pending_cache_add(struct m0_rpc_item *item)
Definition: item.c:1710
static struct m0_rpc_session * session
Definition: item.c:53
static void addb2_add_rpc_attrs(const struct m0_rpc_item *req)
Definition: item.c:1456
M0_INTERNAL void m0_rwlock_init(struct m0_rwlock *lock)
Definition: rwlock.c:32
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
uint64_t s_xid
Definition: session.h:331
static struct m0_sm_state_descr incoming_item_states[]
Definition: item.c:276
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
Definition: item.c:104
return M0_RC(rc)
M0_INTERNAL int m0_xcode_length(struct m0_xcode_ctx *ctx)
Definition: xcode.c:390
#define equi(a, b)
Definition: misc.h:297
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
Definition: conn.c:1306
M0_INTERNAL void m0_rpc_item_send_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1503
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search(const struct m0_rpc_conn *conn, uint64_t session_id)
Definition: conn.c:760
#define M0_ENTRY(...)
Definition: trace.h:170
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
Definition: item.c:470
uint64_t osr_xid
Definition: onwire.h:105
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
M0_TL_DEFINE(rpcitem, M0_INTERNAL, struct m0_rpc_item)
void m0_rpc_item_fini(struct m0_rpc_item *item)
Definition: item.c:394
int opcode
Definition: crate.c:301
struct m0_tl s_pending_cache
Definition: session.h:368
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
int i
Definition: dir.c:1033
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
Definition: conn.h:281
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_payload_size(const struct m0_rpc_session *session)
Definition: session.c:775
#define PRIu64
Definition: types.h:58
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
uint64_t rs_nr_failed_items
Definition: rpc_machine.h:58
return M0_ERR(-EOPNOTSUPP)
bool ri_xid_assigned_here
Definition: item.h:238
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
M0_INTERNAL void m0_rpc_item_timer_stop(struct m0_rpc_item *item)
Definition: item.c:1018
M0_INTERNAL int m0_rpc_conn_ha_timer_start(struct m0_rpc_conn *conn)
Definition: conn.c:1417
M0_INTERNAL void m0_rpc_item_process_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1464
m0_bcount_t(* rito_payload_size)(const struct m0_rpc_item *item)
Definition: item.h:385
M0_INTERNAL void m0_rpc_item_module_fini(void)
Definition: item.c:137
uint64_t rs_nr_rcvd_items
Definition: rpc_machine.h:56
void(* rito_item_get)(struct m0_rpc_item *item)
Definition: item.h:423
M0_INTERNAL const char * m0_rpc_item_type_name(const struct m0_rpc_item_type *item_type)
Definition: item.c:1202
void m0_rpc_item_cancel(struct m0_rpc_item *item)
Definition: item.c:932
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL void m0_rpc_item_pending_cache_del(struct m0_rpc_item *item)
Definition: item.c:1741
M0_INTERNAL void m0_rpc_frm_item_deadline_passed(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:384
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
M0_INTERNAL void m0_rpc_session_hold_busy(struct m0_rpc_session *session)
Definition: session.c:782
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
Definition: item.c:704
M0_INTERNAL bool m0_rpc_item_xid_check(struct m0_rpc_item *item, struct m0_rpc_item **next)
Definition: item.c:617
uint64_t ri_magic
Definition: item.h:232
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
struct crate_conf * conf
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
struct m0_sm_conf rit_outgoing_conf
Definition: item.h:484
M0_INTERNAL void m0_rpc_conn_ha_timer_stop(struct m0_rpc_conn *conn)
Definition: conn.c:1438
M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
Definition: item.c:1129
static void item_cancel_fi(struct m0_rpc_item *item)
Definition: item.c:841
static void pending_cache_drain(struct m0_rpc_session *session)
Definition: item.c:1693
m0_time_t m0_time_now(void)
Definition: time.c:134
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
Definition: tlist.h:251
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
Definition: item.c:490
M0_INTERNAL void m0_rpc_item_cache_purge(struct m0_rpc_item_cache *ic)
Definition: item.c:1639
M0_INTERNAL void m0_rpc_item_cache_del(struct m0_rpc_item_cache *ic, uint64_t xid)
Definition: item.c:1616
struct m0_rpc_frm * ri_frm
Definition: item.h:226
#define ITEM_FMT
Definition: item.h:617
static int next[]
Definition: cp.c:248
M0_INTERNAL void m0_rpc_session_item_failed(struct m0_rpc_item *item)
Definition: session.c:880
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
Definition: session.c:141
M0_TL_DESCR_DEFINE(rpcitem, "rpc item tlist", M0_INTERNAL, struct m0_rpc_item, ri_field, ri_magic, M0_RPC_ITEM_MAGIC, M0_RPC_ITEM_HEAD_MAGIC)
uint64_t rit_magic
Definition: item.h:482
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
struct m0_rpc_item * ri_reply
Definition: item.h:163
static struct m0_sm_state_descr outgoing_item_states[]
Definition: item.c:205
M0_INTERNAL void m0_rpc_frm_remove_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:626
uint64_t ri_nr_sent_max
Definition: item.h:146
#define M0_POST(cond)
struct m0_rpc_frm rc_frm
#define UINT64_MAX
Definition: types.h:44
uint64_t ri_cookid
Definition: item.h:190
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
Definition: conn.c:1258
M0_INTERNAL bool m0_rpc_item_is_update(const struct m0_rpc_item *item)
Definition: item.c:504
static void item_timedout(struct m0_rpc_item *item)
Definition: item.c:1047
M0_INTERNAL void m0_rwlock_write_unlock(struct m0_rwlock *lock)
Definition: rwlock.c:47
uint64_t osr_session_xid_min
Definition: onwire.h:104
M0_INTERNAL void m0_rpc_item_type_deregister(struct m0_rpc_item_type *item_type)
Definition: item.c:176
struct m0_rpc_conn conn
Definition: fsync.c:96
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
void(* rito_item_put)(struct m0_rpc_item *item)
Definition: item.h:428
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
struct m0_rpc_machine machine
Definition: mdstore.c:58
M0_INTERNAL void m0_rpc_item_pending_cache_fini(struct m0_rpc_session *session)
Definition: item.c:1703
const char * m0_xcode_enum_print(const struct m0_xcode_enum *en, uint64_t val, char *buf)
Definition: enum.c:51
static struct m0_rwlock rpc_item_types_lock
Definition: item.c:89
void m0_addb2_pop(uint64_t id)
Definition: addb2.c:440
#define M0_MOTR_IEM_DESC(_sev_id, _mod_id, _evt_id, _desc,...)
Definition: iem.h:103
static struct m0_tl rpc_item_types_list
Definition: item.c:88
static bool item_reply_received_fi(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1342
M0_INTERNAL const char * m0_rpc_item_opname(const struct m0_rpc_item *item)
Definition: item.c:1195
M0_INTERNAL void m0_rpc_item_pending_cache_init(struct m0_rpc_session *session)
Definition: item.c:1687
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:260
uint64_t rit_flags
Definition: item.h:478
M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
Definition: item.c:516
static uint32_t timeout
Definition: console.c:52
M0_INTERNAL void m0_rpc_item_cache_clear(struct m0_rpc_item_cache *ic)
Definition: item.c:1655
uint32_t sd_flags
Definition: sm.h:378
struct m0_rpc_item_cache s_reply_cache
Definition: session.h:345
M0_INTERNAL int m0_rpc_session_validate(struct m0_rpc_session *session)
Definition: session.c:573
struct m0_rpc_packet * ri_packet
Definition: item.h:223
M0_INTERNAL bool m0_mod_gt(uint64_t x0, uint64_t x1)
Definition: misc.c:171
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
static void item_timer_cb(struct m0_sm_timer *timer)
Definition: item.c:1027
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
Definition: refs.c:44
M0_INTERNAL void m0_rpc_item_xid_min_update(struct m0_rpc_item *item)
Definition: item.c:543
struct m0_rpc_conn_ha_ops rchc_ops
Definition: conn_internal.h:62
static struct m0_rpc_frm * frm
Definition: formation2.c:34
M0_INTERNAL void m0_sm_timeout_fini(struct m0_sm_timeout *to)
Definition: sm.c:705
#define FOOTER_XCODE_OBJ(ptr)
Definition: item.c:109
void m0_rpc_item_put_lock(struct m0_rpc_item *item)
Definition: item.c:454
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_rpc_item_sent_invoke(struct m0_rpc_item *item)
Definition: item.c:1766
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
Definition: sm.c:610
M0_INTERNAL bool m0_rpc_item_cache_add(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item, m0_time_t deadline)
Definition: item.c:1583
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
Definition: conn_internal.h:58
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_IS0(obj)
Definition: misc.h:70
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic, struct m0_mutex *lock)
Definition: item.c:1547
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
static int item_reply_received(struct m0_rpc_item *reply, struct m0_rpc_item **req_out)
Definition: item.c:1306
uint64_t rs_nr_timedout_items
Definition: rpc_machine.h:60
const struct m0_sm_conf outgoing_item_sm_conf
Definition: item.c:59
struct m0_rpc_session * ri_session
Definition: item.h:147
struct m0_tl * ri_itemq
Definition: item.h:225
Definition: sm.h:301
m0_rpc_item_dir
Definition: item.h:109
void m0_rpc_item_cancel_nolock(struct m0_rpc_item *item)
Definition: item.c:867
M0_INTERNAL bool item_is_in_waiting_queue(const struct m0_rpc_item *item, const struct m0_rpc_frm *frm)
Definition: formation2.c:327
m0_bcount_t m0_rpc_item_payload_size(struct m0_rpc_item *item)
Definition: item.c:480
M0_INTERNAL void m0_rwlock_read_lock(struct m0_rwlock *lock)
Definition: rwlock.c:52
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
Definition: conn.c:349
M0_INTERNAL bool m0_rpc_item_cache__invariant(struct m0_rpc_item_cache *ic)
Definition: item.c:1578
M0_INTERNAL void m0_rpc_packet_remove_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
Definition: packet.c:165
M0_INTERNAL void m0_rwlock_fini(struct m0_rwlock *lock)
Definition: rwlock.c:37
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL int m0_rpc_item_dispatch(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_xid_assign(struct m0_rpc_item *item)
Definition: item.c:565
M0_INTERNAL struct m0_rpc_item * m0_rpc_item_cache_lookup(struct m0_rpc_item_cache *ic, uint64_t xid)
Definition: item.c:1631
#define HEADER2_XCODE_OBJ(ptr)
Definition: item.c:108
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_rpc_item_cache s_req_cache
Definition: session.h:351
struct inode * dir
Definition: dir.c:1028
M0_INTERNAL void m0_rwlock_read_unlock(struct m0_rwlock *lock)
Definition: rwlock.c:57
M0_INTERNAL void m0_xcode_ctx_init(struct m0_xcode_ctx *ctx, const struct m0_xcode_obj *obj)
Definition: xcode.c:373
static void rpc_item_cache_del(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item)
Definition: item.c:1606
struct m0_rpc_stats rm_stats
Definition: rpc_machine.h:96
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
Definition: sm.c:566
uint32_t ri_nr_sent
Definition: item.h:183
M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
Definition: conn.c:280
static bool rpc_item_needs_xid(const struct m0_rpc_item *item)
Definition: item.c:531
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
Definition: sm.c:559
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
uint64_t s_session_id
Definition: session.h:309
M0_INTERNAL bool m0_rpc_item_invariant(const struct m0_rpc_item *item)
Definition: item.c:314
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
Definition: item.c:813
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
#define HEADER1_XCODE_OBJ(ptr)
Definition: item.c:107
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL int m0_rpc_item_received(struct m0_rpc_item *item, struct m0_rpc_machine *machine)
Definition: item.c:1208
M0_INTERNAL const char * item_state_name(const struct m0_rpc_item *item)
Definition: item.c:351
M0_INTERNAL void m0_rpc_session_release(struct m0_rpc_session *session)
Definition: session.c:791
int32_t rc
Definition: trigger_fop.h:47
uint64_t ri_ha_epoch
Definition: item.h:136
#define ARRAY_SIZE(a)
Definition: misc.h:45
static int req_replied(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1360
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
Definition: sm.c:577
static struct m0_sm_state_descr states[C_NR]
Definition: sm.c:512
const struct m0_sm_conf * sm_conf
Definition: sm.h:320
struct m0_rpc_conn * s_conn
Definition: session.h:312
static void conn_flag_unset(struct m0_rpc_conn *conn, uint64_t flag)
Definition: conn_internal.h:76
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
Definition: rpc_internal.h:95
Definition: trace.h:478
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
Definition: item.c:1188
void m0_rpc_item_cancel_init(struct m0_rpc_item *item)
Definition: item.c:947
m0_time_t ri_deadline
Definition: item.h:141
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331