Motr  M0
session.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #undef M0_TRACE_SUBSYSTEM
24 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
25 #include "lib/trace.h"
26 #include "lib/errno.h"
27 #include "lib/memory.h"
28 #include "lib/misc.h"
29 #include "lib/bitstring.h"
30 #include "lib/uuid.h"
31 #include "motr/magic.h"
32 #include "fop/fop.h"
33 #include "lib/arith.h" /* M0_CNT_DEC */
34 #include "lib/finject.h"
35 #include "rpc/rpc_internal.h"
36 
45 static void __session_fini(struct m0_rpc_session *session);
46 static void session_failed(struct m0_rpc_session *session, int32_t error);
47 static void session_idle_x_busy(struct m0_rpc_session *session);
48 
57  struct m0_fop sec_fop;
58 
61 };
62 
63 static void session_establish_fop_release(struct m0_ref *ref);
64 
67 };
68 
71 };
72 
73 M0_TL_DESCR_DEFINE(rpc_session, "rpc-sessions", M0_INTERNAL,
74  struct m0_rpc_session, s_link, s_magic, M0_RPC_SESSION_MAGIC,
76 M0_TL_DEFINE(rpc_session, M0_INTERNAL, struct m0_rpc_session);
77 
78 static struct m0_sm_state_descr session_states[] = {
81  .sd_name = "Initialised",
83  M0_RPC_SESSION_IDLE, /* only on rcvr */
86  },
88  .sd_name = "Establishing",
89  .sd_allowed = M0_BITS(M0_RPC_SESSION_IDLE,
91  },
93  .sd_name = "Idle",
94  .sd_allowed = M0_BITS(M0_RPC_SESSION_TERMINATING,
98  },
100  .sd_name = "Busy",
101  .sd_allowed = M0_BITS(M0_RPC_SESSION_IDLE)
102  },
104  .sd_name = "Terminating",
105  .sd_allowed = M0_BITS(M0_RPC_SESSION_TERMINATED,
107  },
109  .sd_name = "Terminated",
110  .sd_allowed = M0_BITS(M0_RPC_SESSION_INITIALISED,
112  },
114  .sd_flags = M0_SDF_FAILURE,
115  .sd_name = "Failed",
116  .sd_allowed = M0_BITS(M0_RPC_SESSION_INITIALISED,
118  },
120  .sd_flags = M0_SDF_TERMINAL,
121  .sd_name = "Finalised",
122  },
123 };
124 
125 static const struct m0_sm_conf session_conf = {
126  .scf_name = "Session states",
127  .scf_nr_states = ARRAY_SIZE(session_states),
128  .scf_state = session_states
129 };
130 
131 M0_INTERNAL void session_state_set(struct m0_rpc_session *session, int state)
132 {
133  M0_PRE(session != NULL);
134 
135  M0_LOG(M0_INFO, "Session %p: %s -> %s", session,
137  session_states[state].sd_name);
138  m0_sm_state_set(&session->s_sm, state);
139 }
140 
141 M0_INTERNAL int session_state(const struct m0_rpc_session *session)
142 {
143  return session->s_sm.sm_state;
144 }
145 
146 M0_INTERNAL struct m0_rpc_machine *
148 {
149  return s->s_conn->c_rpc_machine;
150 }
151 
155 M0_INTERNAL bool m0_rpc_session_invariant(const struct m0_rpc_session *session)
156 {
157  bool ok;
158 
159  ok = _0C(session != NULL) &&
160  _0C(session->s_conn != NULL) &&
161  _0C(ergo(rpc_session_tlink_is_in(session),
162  rpc_session_tlist_contains(&session->s_conn->c_sessions,
163  session))) &&
165  session->s_conn->c_nr_sessions > 0));
166 
167  if (!ok)
168  return false;
169 
170  switch (session_state(session)) {
175 
177  return _0C(session->s_session_id <= SESSION_ID_MAX) &&
179 
180  case M0_RPC_SESSION_IDLE:
184 
185  case M0_RPC_SESSION_BUSY:
186  return _0C(!m0_rpc_session_is_idle(session)) &&
188 
190  return _0C(session->s_sm.sm_rc != 0);
191 
192  default:
193  return _0C(false);
194  }
195  /* Should never reach here */
196  M0_ASSERT(0);
197 }
198 
199 M0_INTERNAL bool m0_rpc_session_is_idle(const struct m0_rpc_session *session)
200 {
201  return session->s_hold_cnt == 0;
202 }
203 
205  struct m0_rpc_conn *conn)
206 {
207  struct m0_rpc_machine *machine;
208  int rc;
209 
210  M0_ENTRY("session: %p, conn: %p", session, conn);
211  M0_PRE(session != NULL && conn != NULL);
212 
214  M0_PRE(machine != NULL);
218 
219  return M0_RC(rc);
220 }
221 M0_EXPORTED(m0_rpc_session_init);
222 
224  struct m0_rpc_conn *conn)
225 {
226  int rc;
227 
228  M0_ENTRY("session: %p, conn: %p", session, conn);
229  M0_PRE(session != NULL && conn != NULL);
231 
232  M0_SET0(session);
233 
235  session->s_conn = conn;
236  session->s_xid = 0;
237  session->s_cancelled = false;
238 
239  rpc_session_tlink_init(session);
249  if (rc == 0)
250  M0_LOG(M0_INFO, "Session %p INITIALISED", session);
251  else
252  M0_LOG(M0_ERROR, "Session %p initialisation failed: %d",
253  session, rc);
256 
257  return M0_RC(rc);
258 }
259 
260 M0_INTERNAL void m0_rpc_session_reset(struct m0_rpc_session *session)
261 {
263 
267  return;
268  }
270  session->s_xid = 0;
271  session->s_cancelled = false;
275 }
276 
284 {
285  M0_ENTRY("session: %p", session);
286 
287  rpc_session_tlink_fini(session);
288 
289  M0_LEAVE();
290 }
291 
292 M0_INTERNAL void m0_rpc_session_fini(struct m0_rpc_session *session)
293 {
294  struct m0_rpc_machine *machine;
295 
296  M0_ENTRY("session: %p", session);
297  M0_PRE(session != NULL &&
298  session->s_conn != NULL &&
300 
302 
306  M0_LEAVE();
307 }
308 M0_EXPORTED(m0_rpc_session_fini);
309 
311 {
312  M0_ENTRY("session %p", session);
317 
322  if (rpc_session_tlink_is_in(session))
328  M0_LOG(M0_INFO, "Session %p FINALISED \n", session);
329  M0_LEAVE();
330 }
331 
333  uint64_t states,
334  const m0_time_t abs_timeout)
335 {
337  int rc;
338 
339  M0_ENTRY("session: %p, abs_timeout: "TIME_F, session,
340  TIME_P(abs_timeout));
341 
344  rc = m0_sm_timedwait(&session->s_sm, states, abs_timeout);
347 
348  return M0_RC(rc ?: session->s_sm.sm_rc);
349 }
350 M0_EXPORTED(m0_rpc_session_timedwait);
351 
353  struct m0_rpc_conn *conn,
354  m0_time_t abs_timeout)
355 {
356  int rc;
357 
358  M0_ENTRY("session: %p, conn: %p", session, conn);
359 
361  if (rc == 0) {
362  rc = m0_rpc_session_establish_sync(session, abs_timeout);
363  if (rc != 0)
365  }
366 
367  return M0_RC(rc);
368 }
369 
371  m0_time_t abs_timeout)
372 {
373  int rc;
374 
375  M0_ENTRY("session: %p", session);
376  rc = m0_rpc_session_establish(session, abs_timeout);
377  if (rc != 0)
378  return M0_RC(rc);
379 
382  M0_TIME_NEVER);
383 
386  return M0_RC(rc);
387 }
388 M0_EXPORTED(m0_rpc_session_establish_sync);
389 
391  m0_time_t abs_timeout)
392 {
393  struct m0_rpc_conn *conn;
394  struct m0_fop *fop;
397  struct m0_rpc_session *session_0;
398  struct m0_rpc_machine *machine;
399  int rc;
400 
401  M0_ENTRY("session: %p", session);
402  M0_PRE(session != NULL);
403 
404  if (M0_FI_ENABLED("fake_error"))
405  return M0_RC(-EINVAL);
406 
408 
409  M0_ALLOC_PTR(ctx);
410  if (ctx == NULL) {
411  rc = M0_ERR(-ENOMEM);
412  } else {
413  ctx->sec_session = session;
414  m0_fop_init(&ctx->sec_fop,
417  rc = m0_fop_data_alloc(&ctx->sec_fop);
418  if (rc != 0) {
419  m0_fop_put_lock(&ctx->sec_fop);
420  }
421  }
422 
424 
427 
428  if (rc != 0) {
431  return M0_RC(rc);
432  }
433 
434  conn = session->s_conn;
435 
437 
438  fop = &ctx->sec_fop;
439  args = m0_fop_data(fop);
440  M0_ASSERT(args != NULL);
441 
442  args->rse_sender_id = conn->c_sender_id;
443 
444  session_0 = m0_rpc_conn_session0(conn);
446  abs_timeout);
447  if (rc == 0) {
449  } else {
451  }
452  m0_fop_put(fop);
453 
456 
458 
459  /* see m0_rpc_session_establish_reply_received() */
460  return M0_RC(rc);
461 }
462 M0_EXPORTED(m0_rpc_session_establish);
463 
474 static void session_failed(struct m0_rpc_session *session, int32_t error)
475 {
483 
485 }
486 
488  *item)
489 {
492  struct m0_rpc_machine *machine;
493  struct m0_rpc_session *session;
494  struct m0_rpc_item *reply_item;
495  struct m0_fop *fop;
496  uint64_t session_id;
497  int32_t rc;
498 
499  M0_ENTRY("item: %p", item);
500  M0_PRE(item != NULL &&
501  item->ri_session != NULL &&
503 
505  ctx = container_of(fop, struct fop_session_establish_ctx, sec_fop);
506  session = ctx->sec_session;
507  M0_ASSERT(session != NULL);
508 
511 
514  "Invalid session state: expected %s, got %s",
517 
519  if (rc == 0) {
520  reply_item = item->ri_reply;
521  M0_ASSERT(reply_item != NULL &&
522  item->ri_session == reply_item->ri_session);
523  reply = m0_fop_data(m0_rpc_item_to_fop(reply_item));
524  rc = reply->rser_rc;
525  }
526  if (rc == 0) {
527  M0_ASSERT(reply != NULL);
528  session_id = reply->rser_session_id;
529  if (session_id > SESSION_ID_MIN &&
530  session_id < SESSION_ID_MAX &&
531  reply->rser_sender_id != SENDER_ID_INVALID) {
532  session->s_session_id = session_id;
534  } else {
535  rc = M0_ERR(-EPROTO);
536  }
537  }
538  if (rc != 0)
540 
545  M0_LEAVE();
546 }
547 
548 static void session_establish_fop_release(struct m0_ref *ref)
549 {
551  struct m0_fop *fop;
552 
553  fop = container_of(ref, struct m0_fop, f_ref);
554  m0_fop_fini(fop);
555  ctx = container_of(fop, struct fop_session_establish_ctx, sec_fop);
556  m0_free(ctx);
557 }
558 
560  m0_time_t abs_timeout)
561 {
562  int rc;
563 
564  M0_ENTRY("session: %p", session);
565 
566  rc = m0_rpc_session_terminate_sync(session, abs_timeout);
568 
569  return M0_RC(rc);
570 }
571 M0_EXPORTED(m0_rpc_session_destroy);
572 
574 {
575  M0_ENTRY();
576  M0_PRE(session != NULL);
577  if (session->s_cancelled)
578  return M0_ERR_INFO(-ECANCELED, "Cancelled session");
581  return M0_ERR_INFO(-EINVAL, "Session state %s is not valid",
584  if (session->s_conn == NULL)
585  return M0_ERR_INFO(-ENOMEDIUM, "Session connection is NULL");
586  if (session->s_conn->c_rpc_machine == NULL)
587  return M0_ERR_INFO(-ENOMEDIUM,
588  "Session connection rpc machine is NULL");
589  return M0_RC(0);
590 }
591 
593  m0_time_t abs_timeout)
594 {
595  int rc;
596 
597  M0_ENTRY("session: %p", session);
601 
602  /* Wait for session to become IDLE */
604  M0_TIME_NEVER);
605 
606  rc = m0_rpc_session_terminate(session, abs_timeout);
607  if (rc == 0) {
608  M0_LOG(M0_DEBUG, "session: %p, wait for termination", session);
612  M0_TIME_NEVER);
613 
617  }
618  return M0_RC(rc);
619 }
620 M0_EXPORTED(m0_rpc_session_terminate_sync);
621 
623  m0_time_t abs_timeout)
624 {
625  struct m0_fop *fop;
627  struct m0_rpc_session *session_0;
628  struct m0_rpc_machine *machine;
629  struct m0_rpc_conn *conn;
630  int rc;
631 
632  M0_ENTRY("session: %p", session);
633  M0_PRE(session != NULL && session->s_conn != NULL);
634 
635  conn = session->s_conn;
637 
639 
643 
646  return M0_RC(0);
647  }
648 
649  if (!M0_FI_ENABLED("fail_allocation"))
651  NULL, machine);
652  else
653  fop = NULL;
654  if (fop == NULL) {
655  rc = M0_ERR(-ENOMEM);
656  /* See [^1] about decision to move session to FAILED state */
658  goto out_unlock;
659  }
660 
661  args = m0_fop_data(fop);
662  args->rst_sender_id = conn->c_sender_id;
663  args->rst_session_id = session->s_session_id;
664 
665  session_0 = m0_rpc_conn_session0(conn);
666 
667  /*
668  * m0_rpc_session_establish_reply_received() expects the session
669  * to be in M0_RPC_SESSION_TERMINATING state. Make sure it is so,
670  * even if item send below fails.
671  */
674  abs_timeout);
675  /*
676  * It is possible that ->rio_replied() was called
677  * and session is terminated already.
678  */
681 
682  m0_fop_put(fop);
683 
684 out_unlock:
687 
689 
690  return M0_RC(rc);
691 }
692 M0_EXPORTED(m0_rpc_session_terminate);
693 /*
694  * m0_rpc_session_terminate
695  * [^1]
696  * There are two choices here:
697  *
698  * 1. leave session in TERMNATING state FOREVER.
699  * Then when to fini/cleanup session.
700  * This will not allow finialising of session, in turn conn,
701  * and rpc_machine can't be finalised.
702  *
703  * 2. Move session to FAILED state.
704  * For this session the receiver side state will still
705  * continue to exist. And receiver can send one-way
706  * items, that will be received on sender i.e. current node.
707  * Current code will drop such items. When/how to fini and
708  * cleanup receiver side state? XXX
709  *
710  * For now, later is chosen. This can be changed in future
711  * to alternative 1, iff required.
712  */
713 
714 
716  *item)
717 {
720  struct m0_rpc_item *reply_item;
721  struct m0_rpc_conn *conn;
722  struct m0_rpc_session *session;
723  struct m0_rpc_machine *machine;
724  uint64_t sender_id;
725  uint64_t session_id;
726  int32_t rc;
727 
728  M0_ENTRY("item: %p", item);
729  M0_PRE(item != NULL &&
730  item->ri_session != NULL &&
732 
733  conn = item2conn(item);
737 
739  sender_id = args->rst_sender_id;
740  session_id = args->rst_session_id;
741  M0_ASSERT(sender_id == conn->c_sender_id);
742  session = m0_rpc_session_search(conn, session_id);
745 
747  if (rc == 0) {
748  reply_item = item->ri_reply;
749  M0_ASSERT(reply_item != NULL &&
750  item->ri_session == reply_item->ri_session);
751  reply = m0_fop_data(m0_rpc_item_to_fop(reply_item));
752  rc = reply->rstr_rc;
753  }
754  if (rc == 0)
756  else
758 
763  M0_LEAVE();
764 }
765 
766 M0_INTERNAL m0_bcount_t
768 {
772 }
773 
774 M0_INTERNAL m0_bcount_t
776 {
780 }
781 
783 {
784  M0_LOG(M0_DEBUG, "session %p %d -> %d", session, session->s_hold_cnt,
785  session->s_hold_cnt + 1);
786 
787  ++session->s_hold_cnt;
789 }
790 
792 {
794  M0_PRE(session->s_hold_cnt > 0);
795 
796  M0_LOG(M0_DEBUG, "session %p %d -> %d", session, session->s_hold_cnt,
797  session->s_hold_cnt - 1);
798 
799  --session->s_hold_cnt;
801 }
802 
805 {
806  int state = session_state(session);
808 
810 
811  if (state == M0_RPC_SESSION_IDLE && !idle) {
813  } else if (state == M0_RPC_SESSION_BUSY && idle) {
815  }
816 
820 }
821 
823 {
824  M0_ENTRY("session: %p", session);
825 
826  M0_PRE(session != NULL);
830 
833  return M0_RC(0);
834 }
835 
837 {
838  M0_ENTRY("session: %p", session);
839 
840  M0_PRE(session != NULL);
844 
847  M0_LEAVE();
848 }
849 
851 {
852  struct m0_rpc_item *item;
853 
857 
858  M0_ENTRY("session %p", session);
860  if (session->s_cancelled)
861  goto leave_unlock;
862  session->s_cancelled = true;
863  m0_tl_for(pending_item, &session->s_pending_cache, item) {
867  } m0_tl_endfor;
868 leave_unlock:
870  M0_POST(pending_item_tlist_is_empty(&session->s_pending_cache));
872  M0_LEAVE("session %p", session);
873 }
874 
876 {
877  return session->s_cancelled;
878 }
879 
880 M0_INTERNAL void m0_rpc_session_item_failed(struct m0_rpc_item *item)
881 {
882  M0_PRE(item != NULL && item->ri_error != 0);
884 
886  M0_ASSERT(item->ri_error != 0);
888  }
889 }
890 
891 M0_INTERNAL const char *
893 {
894  switch (state) {
895 #define S_CASE(x) case x: return #x
904 #undef S_CASE
905  default:
906  M0_LOG(M0_ERROR, "Invalid state: %d", state);
907  return NULL;
908  }
909 }
910 
911 #undef M0_TRACE_SUBSYSTEM
912 
915 /*
916  * Local variables:
917  * c-indentation-style: "K&R"
918  * c-basic-offset: 8
919  * tab-width: 8
920  * fill-column: 80
921  * scroll-step: 1
922  * End:
923  */
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_size(const struct m0_rpc_session *session)
Definition: session.c:767
M0_INTERNAL void m0_rpc_item_xid_list_fini(struct m0_rpc_session *session)
Definition: item.c:1675
#define M0_PRE(cond)
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
M0_INTERNAL void m0_rpc_session_fini_locked(struct m0_rpc_session *session)
Definition: session.c:310
uint64_t c_nr_sessions
Definition: conn.h:301
M0_INTERNAL int m0_rpc_session_terminate_sync(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:592
M0_INTERNAL void m0_rpc_item_xid_list_init(struct m0_rpc_session *session)
Definition: item.c:1669
#define NULL
Definition: misc.h:38
#define ergo(a, b)
Definition: misc.h:293
Definition: sm.h:350
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
M0_INTERNAL int m0_rpc_session_create(struct m0_rpc_session *session, struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: session.c:352
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
M0_INTERNAL void m0_rpc_item_replied_invoke(struct m0_rpc_item *req)
Definition: item.c:1756
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_EXTERN struct m0_rpc_session * m0_rpc_conn_session0(const struct m0_rpc_conn *conn)
Definition: conn.c:749
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
int m0_rpc_session_destroy(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:559
struct m0_sm ri_sm
Definition: item.h:181
M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
Definition: item.c:1566
M0_INTERNAL void m0_rpc_conn_remove_session(struct m0_rpc_session *session)
Definition: conn.c:736
int32_t ri_error
Definition: item.h:161
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
static int error
Definition: mdstore.c:64
static void session_idle_x_busy(struct m0_rpc_session *session)
Definition: session.c:804
m0_rpc_session_state
Definition: session.h:112
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL void m0_rpc_session_fini(struct m0_rpc_session *session)
Definition: session.c:292
uint64_t m0_bcount_t
Definition: types.h:77
static struct m0_rpc_session session
Definition: session.c:36
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
Definition: ub.c:49
#define TIME_P(t)
Definition: time.h:45
static const struct m0_rpc_item_ops session_terminate_item_ops
Definition: session.c:69
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_footer_size
Definition: item.c:105
M0_INTERNAL bool m0_rpc_session_is_cancelled(struct m0_rpc_session *session)
Definition: session.c:875
bool s_cancelled
Definition: session.h:360
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
M0_TL_DEFINE(rpc_session, M0_INTERNAL, struct m0_rpc_session)
M0_INTERNAL bool m0_rpc_session_invariant(const struct m0_rpc_session *session)
Definition: session.c:155
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
static void __session_fini(struct m0_rpc_session *session)
Definition: session.c:283
uint64_t s_xid
Definition: session.h:331
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
Definition: item.c:104
return M0_RC(rc)
#define M0_ASSERT_EX(cond)
struct m0_fop sec_fop
Definition: session.c:57
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search(const struct m0_rpc_conn *conn, uint64_t session_id)
Definition: conn.c:760
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL void m0_rpc_conn_add_session(struct m0_rpc_conn *conn, struct m0_rpc_session *session)
Definition: conn.c:722
uint32_t s_hold_cnt
Definition: session.h:320
struct m0_tl s_pending_cache
Definition: session.h:368
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
#define TIME_F
Definition: time.h:44
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_payload_size(const struct m0_rpc_session *session)
Definition: session.c:775
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
def args
Definition: addb2db.py:716
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
M0_INTERNAL void session_state_set(struct m0_rpc_session *session, int state)
Definition: session.c:131
uint64_t c_sender_id
Definition: conn.h:269
Definition: trace.h:482
Definition: refs.h:34
M0_INTERNAL int m0_rpc_session_init(struct m0_rpc_session *session, struct m0_rpc_conn *conn)
Definition: session.c:204
void(* idle)(const struct m0_addb2_mach *mach)
Definition: common.c:34
M0_INTERNAL int m0_rpc__fop_post(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ops, m0_time_t abs_timeout)
Definition: session_utils.c:62
M0_INTERNAL void m0_rpc_session_hold_busy(struct m0_rpc_session *session)
Definition: session.c:782
M0_INTERNAL int m0_rpc_session_terminate(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:622
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
static const struct m0_rpc_item_ops session_establish_item_ops
Definition: session.c:65
#define S_CASE(x)
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
Definition: session.c:850
uint32_t rm_min_recv_size
Definition: rpc_machine.h:148
M0_INTERNAL void m0_rpc_session_item_failed(struct m0_rpc_item *item)
Definition: session.c:880
static void session_failed(struct m0_rpc_session *session, int32_t error)
Definition: session.c:474
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
Definition: session.c:141
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL int m0_rpc_session_timedwait(struct m0_rpc_session *session, uint64_t states, const m0_time_t abs_timeout)
Definition: session.c:332
#define M0_POST(cond)
M0_INTERNAL struct m0_rpc_machine * session_machine(const struct m0_rpc_session *s)
Definition: session.c:147
int32_t sm_rc
Definition: sm.h:336
M0_INTERNAL void m0_rpc_session_reset(struct m0_rpc_session *session)
Definition: session.c:260
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
Definition: fop.c:71
struct m0_rpc_conn conn
Definition: fsync.c:96
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
struct m0_mutex s_lock
Definition: sm.h:514
struct m0_rpc_session * sec_session
Definition: session.c:60
struct m0_rpc_machine machine
Definition: mdstore.c:58
M0_INTERNAL void m0_rpc_item_pending_cache_fini(struct m0_rpc_session *session)
Definition: item.c:1703
static struct fdmi_ctx ctx
Definition: main.c:80
struct m0_sm s_sm
Definition: session.h:325
M0_INTERNAL void m0_rpc_item_pending_cache_init(struct m0_rpc_session *session)
Definition: item.c:1687
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
struct m0_tl c_sessions
Definition: conn.h:308
uint32_t sd_flags
Definition: sm.h:378
struct m0_rpc_item_cache s_reply_cache
Definition: session.h:345
M0_INTERNAL int m0_rpc_session_validate(struct m0_rpc_session *session)
Definition: session.c:573
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
M0_INTERNAL void m0_rpc_session_quiesce(struct m0_rpc_session *session)
Definition: session.c:836
const char * sd_name
Definition: sm.h:383
struct m0_fop_type m0_rpc_fop_session_establish_fopt
Definition: session_fops.c:124
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
struct m0_ref f_ref
Definition: fop.h:80
M0_INTERNAL void m0_rpc_session_establish_reply_received(struct m0_rpc_item *item)
Definition: session.c:487
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic, struct m0_mutex *lock)
Definition: item.c:1547
M0_INTERNAL int m0_rpc_session_init_locked(struct m0_rpc_session *session, struct m0_rpc_conn *conn)
Definition: session.c:223
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_TL_DESCR_DEFINE(rpc_session, "rpc-sessions", M0_INTERNAL, struct m0_rpc_session, s_link, s_magic, M0_RPC_SESSION_MAGIC, M0_RPC_SESSION_HEAD_MAGIC)
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL m0_bcount_t m0_rpc_packet_onwire_footer_size(void)
Definition: packet.c:68
void m0_rpc_item_cancel_nolock(struct m0_rpc_item *item)
Definition: item.c:867
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
static void session_establish_fop_release(struct m0_ref *ref)
Definition: session.c:548
M0_INTERNAL int m0_rpc_session_establish(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:390
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_rpc_item_cache s_req_cache
Definition: session.h:351
M0_INTERNAL bool m0_rpc_session_is_idle(const struct m0_rpc_session *session)
Definition: session.c:199
M0_INTERNAL const char * m0_rpc_session_state_to_str(enum m0_rpc_session_state state)
Definition: session.c:892
static struct m0_sm_state_descr session_states[]
Definition: session.c:78
Definition: nucleus.c:42
M0_INTERNAL void m0_rpc_session_terminate_reply_received(struct m0_rpc_item *item)
Definition: session.c:715
static const struct m0_sm_conf session_conf
Definition: session.c:125
M0_INTERNAL int m0_rpc_rcv_session_terminate(struct m0_rpc_session *session)
Definition: session.c:822
M0_INTERNAL int m0_rpc_session_establish_sync(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:370
struct m0_fop_type m0_rpc_fop_session_terminate_fopt
Definition: session_fops.c:126
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
uint64_t s_session_id
Definition: session.h:309
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
void m0_fop_put(struct m0_fop *fop)
Definition: fop.c:177
M0_INTERNAL m0_bcount_t m0_rpc_packet_onwire_header_size(void)
Definition: packet.c:54
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL void m0_rpc_session_release(struct m0_rpc_session *session)
Definition: session.c:791
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
static struct m0_sm_state_descr states[C_NR]
Definition: sm.c:512
struct m0_rpc_conn * s_conn
Definition: session.h:312
Definition: fop.h:79
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
Definition: rpc_internal.h:95
static int conn_state(const struct m0_rpc_conn *conn)
Definition: conn_internal.h:66
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331