Motr  M0
conn.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 #include "lib/errno.h"
26 #include "lib/memory.h"
27 #include "lib/misc.h" /* M0_BITS */
28 #include "lib/bitstring.h"
29 #include "lib/arith.h"
30 #include "lib/finject.h"
31 #include "lib/uuid.h"
32 #include "fop/fop.h"
33 #include "module/instance.h" /* m0_get */
34 #include "reqh/reqh.h" /* m0_reqh */
35 #include "rpc/rpc_internal.h"
36 #include "conf/helpers.h" /* m0_conf_service_ep_is_known */
37 #include "conf/obj_ops.h" /* m0_conf_obj_get, m0_conf_obj_put */
38 #include "conf/cache.h" /* m0_conf_cache_lock, m0_conf_cache_unlock */
39 #include "conf/confc.h" /* m0_confc_is_inited */
40 #include "ha/note.h" /* M0_NC_FAILED */
41 #include "ha/msg.h" /* M0_HA_MSG_EVENT_RPC */
42 #include "ha/ha.h" /* m0_ha_send */
43 
52 M0_INTERNAL struct m0_rpc_chan *rpc_chan_get(struct m0_rpc_machine *machine,
53  struct m0_net_end_point *dest_ep,
54  uint64_t max_rpcs_in_flight);
55 M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan);
56 
57 static bool rpc_conn__on_service_event_cb(struct m0_clink *clink);
58 static void rpc_conn_sessions_cleanup_fail(struct m0_rpc_conn *conn, bool fail);
59 static bool rpc_conn__on_cache_expired_cb(struct m0_clink *clink);
60 static bool rpc_conn__on_cache_ready_cb(struct m0_clink *clink);
61 static struct m0_confc *rpc_conn2confc(const struct m0_rpc_conn *conn);
62 static void rpc_conn_ha_timer_cb(struct m0_sm_timer *timer);
63 static void reqh_service_ha_state_set(struct m0_rpc_conn *conn, uint8_t state);
64 
65 enum {
73 };
74 
76  .rchc_ops = {
78  .cho_ha_notify = reqh_service_ha_state_set,
79  },
80  .rchc_ha_interval = RPC_HA_INTERVAL
81 };
82 
86 static int session_zero_attach(struct m0_rpc_conn *conn);
87 
91 static void session_zero_detach(struct m0_rpc_conn *conn);
92 
93 static int __conn_init(struct m0_rpc_conn *conn,
94  struct m0_net_end_point *ep,
95  struct m0_rpc_machine *machine,
96  uint64_t max_rpcs_in_flight);
100 static void __conn_fini(struct m0_rpc_conn *conn);
101 
102 static void conn_failed(struct m0_rpc_conn *conn, int32_t error);
103 
104 static void deregister_all_item_sources(struct m0_rpc_conn *conn);
105 
106 /*
107  * This is sender side item_ops of conn_establish fop.
108  * Receiver side conn_establish fop has different item_ops
109  * rcv_conn_establish_item_ops defined in rpc/session_fops.c
110  */
113 };
114 
117 };
118 
119 static struct m0_sm_state_descr conn_states[] = {
122  .sd_name = "Initialised",
123  .sd_allowed = M0_BITS(M0_RPC_CONN_CONNECTING,
124  M0_RPC_CONN_ACTIVE, /* rcvr side only */
127  },
129  .sd_name = "Connecting",
130  .sd_allowed = M0_BITS(M0_RPC_CONN_ACTIVE,
132  },
133  [M0_RPC_CONN_ACTIVE] = {
134  .sd_name = "Active",
135  .sd_allowed = M0_BITS(M0_RPC_CONN_TERMINATING,
136  M0_RPC_CONN_TERMINATED, /* rcvr side */
138  },
140  .sd_name = "Terminating",
141  .sd_allowed = M0_BITS(M0_RPC_CONN_TERMINATED,
143  },
145  .sd_name = "Terminated",
146  .sd_allowed = M0_BITS(M0_RPC_CONN_INITIALISED,
148  },
149  [M0_RPC_CONN_FAILED] = {
150  .sd_flags = M0_SDF_FAILURE,
151  .sd_name = "Failed",
152  .sd_allowed = M0_BITS(M0_RPC_CONN_INITIALISED,
154  },
156  .sd_flags = M0_SDF_TERMINAL,
157  .sd_name = "Finalised",
158  },
159 };
160 
161 static const struct m0_sm_conf conn_conf = {
162  .scf_name = "Conn states",
163  .scf_nr_states = ARRAY_SIZE(conn_states),
164  .scf_state = conn_states
165 };
166 
167 M0_INTERNAL void conn_state_set(struct m0_rpc_conn *conn, int state)
168 {
169  M0_PRE(conn != NULL);
170 
171  M0_LOG(M0_INFO, "%p[%s] %s -> %s", conn,
172  m0_rpc_conn_is_snd(conn) ? "SENDER" : "RECEIVER",
174  conn_states[state].sd_name);
175  m0_sm_state_set(&conn->c_sm, state);
176 }
177 
183 M0_INTERNAL bool m0_rpc_conn_invariant(const struct m0_rpc_conn *conn)
184 {
185  struct m0_rpc_session *session0;
186  struct m0_tl *conn_list;
187  int s0nr; /* Number of sessions with id == 0 */
188  bool sender_end;
189  bool recv_end;
190  bool ok;
191 
192  if (conn == NULL || conn->c_rpc_machine == NULL)
193  return false;
194 
195  session0 = NULL;
196  sender_end = m0_rpc_conn_is_snd(conn);
197  recv_end = m0_rpc_conn_is_rcv(conn);
198  conn_list = sender_end ?
201  s0nr = 0;
202 
203  /* conditions that should be true irrespective of conn state */
204  ok = _0C(sender_end != recv_end) &&
205  _0C(rpc_conn_tlist_contains(conn_list, conn)) &&
206  _0C(M0_CHECK_EX(m0_tlist_invariant(&rpc_session_tl,
207  &conn->c_sessions))) &&
208  _0C(rpc_session_tlist_length(&conn->c_sessions) ==
209  conn->c_nr_sessions) &&
211  /*
212  * Each connection has exactly one session with id SESSION_ID_0.
213  * From m0_rpc_conn_init() to m0_rpc_conn_fini(), this session0 is
214  * either in IDLE state or BUSY state.
215  */
216  m0_tl_forall(rpc_session, s, &conn->c_sessions,
217  _0C(ergo(s->s_session_id == SESSION_ID_0,
218  ++s0nr &&
219  (session0 = s) && /*'=' is intentional */
221  M0_RPC_SESSION_BUSY))))) &&
222  _0C(session0 != NULL) &&
223  _0C(s0nr == 1);
224  if (!ok)
225  return false;
226  /*
227  * A connection not in ACTIVE or FAILED state has sessoins with only
228  * specific states except of session0.
229  */
231  m0_tl_forall(rpc_session, s, &conn->c_sessions,
232  ergo(s->s_session_id != SESSION_ID_0,
241  if (!_0C(ok))
242  return false;
243 
244  switch (conn_state(conn)) {
246  return _0C(conn->c_sender_id == SENDER_ID_INVALID) &&
247  _0C(conn->c_nr_sessions >= 1) &&
249 
251  return _0C(conn->c_sender_id == SENDER_ID_INVALID) &&
252  _0C(conn->c_nr_sessions >= 1);
253 
254  case M0_RPC_CONN_ACTIVE:
255  return _0C(conn->c_sender_id != SENDER_ID_INVALID) &&
256  _0C(conn->c_nr_sessions >= 1);
257 
259  return _0C(conn->c_nr_sessions >= 1) &&
261 
263  return _0C(conn->c_nr_sessions >= 1) &&
265  _0C(conn->c_sm.sm_rc == 0);
266 
267  case M0_RPC_CONN_FAILED:
268  return _0C(conn->c_sm.sm_rc != 0);
269 
270  default:
271  return false;
272  }
273  /* Should never reach here */
274  M0_ASSERT(0);
275 }
276 
280 M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
281 {
282  return (conn->c_flags & RCF_SENDER_END) == RCF_SENDER_END;
283 }
284 
288 M0_INTERNAL bool m0_rpc_conn_is_rcv(const struct m0_rpc_conn *conn)
289 {
290  return (conn->c_flags & RCF_RECV_END) == RCF_RECV_END;
291 }
292 
293 M0_INTERNAL int m0_rpc_conn_init(struct m0_rpc_conn *conn,
294  struct m0_fid *svc_fid,
295  struct m0_net_end_point *ep,
296  struct m0_rpc_machine *machine,
297  uint64_t max_rpcs_in_flight)
298 {
299  int rc;
300  struct m0_conf_cache *cc;
301 
302  M0_ENTRY("conn=%p, svc_fid=%p ep=%s", conn, svc_fid,
303  ep != NULL ? ep->nep_addr : NULL );
304  M0_PRE(conn != NULL && machine != NULL && ep != NULL);
305 
306  M0_SET0(conn);
307 
309  /*
310  * Lock ordering -
311  * 1. Conf cache lock
312  * 2. RPC machine lock
313  */
314  if (svc_fid != NULL && m0_fid_is_set(svc_fid))
317 
318  if (svc_fid != NULL)
319  conn->c_svc_fid = *svc_fid;
322 
323  rc = __conn_init(conn, ep, machine, max_rpcs_in_flight);
324  if (rc == 0) {
326  &machine->rm_sm_grp);
328  M0_LOG(M0_INFO, "%p INITIALISED \n", conn);
329  }
330 
334 
336  if (svc_fid != NULL && m0_fid_is_set(svc_fid))
338 
339  return M0_RC(rc);
340 }
341 M0_EXPORTED(m0_rpc_conn_init);
342 
343 static struct m0_confc *rpc_conn2confc(const struct m0_rpc_conn *conn)
344 {
345 
347 }
348 
349 M0_INTERNAL struct m0_conf_obj *m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
350 {
351  struct m0_conf_obj *obj;
352 
353  if (!m0_fid_is_set(&conn->c_svc_fid))
354  return NULL;
355 
358  &conn->c_svc_fid);
359  if (obj == NULL)
360  M0_LOG(M0_WARN, FID_F" not found in cache",
361  FID_P(&conn->c_svc_fid));
362  return obj;
363 }
364 
366 {
367  struct m0_conf_obj *svc_obj;
368 
369  M0_ENTRY("conn = %p", conn);
370  if (!m0_fid_is_set(&conn->c_svc_fid))
371  goto leave;
372  svc_obj = m0_conf_cache_lookup(&rpc_conn2confc(conn)->cc_cache,
373  &conn->c_svc_fid);
374  M0_ASSERT_INFO(svc_obj != NULL, "unknown service " FID_F,
375  FID_P(&conn->c_svc_fid));
377  M0_LOG(M0_DEBUG, "svc_fid "FID_F", cs_type=%d", FID_P(&conn->c_svc_fid),
378  M0_CONF_CAST(svc_obj, m0_conf_service)->cs_type);
379  m0_conf_obj_get(svc_obj);
380  m0_clink_add(&svc_obj->co_ha_chan, &conn->c_ha_clink);
381 leave:
382  M0_LEAVE("service fid = "FID_F, FID_P(&conn->c_svc_fid));
383 }
384 
386 {
387  struct m0_conf_obj *svc_obj;
388 
389  if (!m0_fid_is_set(&conn->c_svc_fid) ||
391  return;
392  svc_obj = m0_rpc_conn2svc(conn);
393  M0_ASSERT(svc_obj != NULL);
394  m0_conf_obj_put(svc_obj);
397 }
398 
399 static int __conn_init(struct m0_rpc_conn *conn,
400  struct m0_net_end_point *ep,
401  struct m0_rpc_machine *machine,
402  uint64_t max_rpcs_in_flight)
403 {
404  int rc;
405 
406  M0_ENTRY();
407  M0_PRE(conn != NULL && ep != NULL &&
410 
411  conn->c_rpcchan = rpc_chan_get(machine, ep, max_rpcs_in_flight);
412  if (conn->c_rpcchan == NULL) {
413  M0_SET0(conn);
414  return M0_RC(-ENOMEM);
415  }
416 
419  conn->c_nr_sessions = 0;
421  conn->c_ast_in_progress = false;
422 
424  /*
425  * conf cache lock is held by the caller at this point if
426  * conn->c_svc_fid has a non-zero value. Also in that case, it is
427  * acquired in the correct order(that is, before rpc machine lock).
428  */
437  M0_LOG(M0_DEBUG, "conn %p has subscribed on rconfc events,\
438  fid "FID_F, conn, FID_P(&conn->c_svc_fid));
439  }
440  rpc_session_tlist_init(&conn->c_sessions);
441  item_source_tlist_init(&conn->c_item_sources);
442  rpc_conn_tlink_init(conn);
443 
445  if (rc != 0) {
446  __conn_fini(conn);
447  M0_SET0(conn);
448  }
449  return M0_RC(rc);
450 }
451 
452 M0_INTERNAL void m0_rpc_conn_reset(struct m0_rpc_conn *conn)
453 {
455  struct m0_rpc_session *session0;
456 
460  return;
461  }
464  session0->s_xid = 0;
468 }
469 
471 {
472  struct m0_rpc_session *session;
473  int rc;
474 
475  M0_ENTRY("conn: %p", conn);
477 
478  if (M0_FI_ENABLED("out-of-memory")) return -ENOMEM;
479 
481  if (session == NULL)
482  return M0_ERR(-ENOMEM);
483 
485  if (rc != 0) {
486  m0_free(session);
487  return M0_RC(rc);
488  }
489 
491  /* It is done as there is no need to establish session0 explicitly
492  * and direct transition from INITIALISED => IDLE is not allowed.
493  */
495 
497  return M0_RC(0);
498 }
499 
500 static void __conn_fini(struct m0_rpc_conn *conn)
501 {
502  M0_ENTRY("conn: %p", conn);
503  M0_PRE(conn != NULL);
504  /*
505  * There must be no HA subscription to the moment to prevent
506  * rpc_conn__on_service_event_cb() from being called when object is
507  * already about to die.
508  */
513 
515 
516  rpc_session_tlist_fini(&conn->c_sessions);
517  item_source_tlist_fini(&conn->c_item_sources);
518  rpc_conn_tlink_fini(conn);
522  M0_LEAVE();
523 }
524 
525 M0_INTERNAL int m0_rpc_rcv_conn_init(struct m0_rpc_conn *conn,
526  struct m0_net_end_point *ep,
527  struct m0_rpc_machine *machine,
528  const struct m0_uint128 *uuid)
529 {
530  int rc;
531 
532  M0_ENTRY("conn: %p, ep_addr: %s, machine: %p", conn,
533  (char *)ep->nep_addr, machine);
534  M0_ASSERT(conn != NULL && ep != NULL);
536 
537  M0_SET0(conn);
538 
540  conn->c_uuid = *uuid;
541 
542  rc = __conn_init(conn, ep, machine, 8 /* max packets in flight */);
543  if (rc == 0) {
545  &machine->rm_sm_grp);
547  M0_LOG(M0_INFO, "%p INITIALISED \n", conn);
548  }
549 
554 
555  return M0_RC(rc);
556 }
557 
558 M0_INTERNAL void m0_rpc_conn_fini(struct m0_rpc_conn *conn)
559 {
560  struct m0_rpc_machine *machine;
561  struct m0_rpc_session *session0;
562 
563  M0_ENTRY("conn: %p", conn);
564  M0_PRE(conn != NULL && conn->c_rpc_machine != NULL);
565 
567 
569 
575  M0_TIME_NEVER);
576 
578  /* Don't look in conn after this point */
580 
581  M0_LEAVE();
582 }
583 M0_EXPORTED(m0_rpc_conn_fini);
584 
585 M0_INTERNAL int m0_rpc_conn_ha_subscribe(struct m0_rpc_conn *conn,
586  struct m0_fid *svc_fid)
587 {
588  struct m0_conf_obj *svc_obj;
589  const char *ep;
590  struct m0_conf_cache *cc;
591 
592  M0_ENTRY("conn %p, svc_fid "FID_F, conn, FID_P(svc_fid));
593  M0_PRE(_0C(conn != NULL) && _0C(svc_fid != NULL));
595  m0_fid_eq(&conn->c_svc_fid, svc_fid));
596 
597  if (M0_IN(conn_state(conn), (M0_RPC_CONN_ACTIVE))) {
598 
599  svc_obj = m0_conf_cache_lookup(&rpc_conn2confc(conn)->cc_cache,
600  svc_fid);
601  if (svc_obj == NULL)
602  return M0_ERR_INFO(-ENOENT, "unknown service " FID_F,
603  FID_P(svc_fid));
604  /*
605  * found service object must match to already established
606  * connection endpoint, i.e. the endpoint must be known to the
607  * service configuration
608  */
610  if (!m0_conf_service_ep_is_known(svc_obj, ep))
611  return M0_ERR_INFO(-EINVAL, "Conn %p ep %s "
612  "unknown to svc_obj " FID_F, conn,
613  ep, FID_P(&svc_obj->co_id));
614  }
615 
617  /*
618  * Lock ordering -
619  * 1. Conf cache lock
620  * 2. RPC machine lock
621  */
624  conn->c_svc_fid = *svc_fid;
628 
629  return M0_RC(0);
630 }
631 
632 M0_INTERNAL void m0_rpc_conn_ha_unsubscribe(struct m0_rpc_conn *conn)
633 {
634  struct m0_conf_cache *cc;
635 
636  M0_PRE(conn != NULL);
637  if (!m0_fid_is_set(&conn->c_svc_fid) ||
638  /*
639  * @todo check if this can be avoided or if it is masking the
640  * real problem here.
641  */
643  return;
644 
646  /*
647  * Lock ordering -
648  * 1. Conf cache lock
649  * 2. RPC machine lock
650  */
657 }
658 
659 M0_INTERNAL void m0_rpc_conn_fini_locked(struct m0_rpc_conn *conn)
660 {
661  M0_ENTRY("conn: %p", conn);
663 
668 
669  rpc_conn_tlist_del(conn);
670  M0_LOG(M0_DEBUG, "rpcmach %p conn %p deleted from %s list",
672  (conn->c_flags & RCF_SENDER_END) ? "outgoing" : "incoming");
674  __conn_fini(conn);
676  m0_sm_fini(&conn->c_sm);
677  M0_LOG(M0_INFO, "%p FINALISED \n", conn);
678  M0_SET0(conn);
679  M0_LEAVE();
680 }
681 
683 {
684  struct m0_rpc_session *session;
685 
686  M0_ENTRY("conn: %p", conn);
687  M0_PRE(conn != NULL);
689 
692 
695  m0_free(session);
696 
697  M0_LEAVE();
698 }
699 
700 M0_INTERNAL int m0_rpc_conn_timedwait(struct m0_rpc_conn *conn,
701  uint64_t states,
702  const m0_time_t timeout)
703 {
704  int rc;
705 
706  M0_ENTRY("conn: %p, abs_timeout: "TIME_F, conn,
707  TIME_P(timeout));
708  M0_PRE(conn != NULL && conn->c_rpc_machine != NULL);
709 
712 
714 
717 
718  return M0_RC(rc ?: conn->c_sm.sm_rc);
719 }
720 M0_EXPORTED(m0_rpc_conn_timedwait);
721 
722 M0_INTERNAL void m0_rpc_conn_add_session(struct m0_rpc_conn *conn,
723  struct m0_rpc_session *session)
724 {
725  struct m0_rpc_machine_watch *watch;
726 
727  rpc_session_tlist_add(&conn->c_sessions, session);
728  conn->c_nr_sessions++;
729 
730  m0_tl_for(rmach_watch, &conn->c_rpc_machine->rm_watch, watch) {
731  if (watch->mw_session_added != NULL)
732  watch->mw_session_added(watch, session);
733  } m0_tl_endfor;
734 }
735 
737 {
739 
740  rpc_session_tlist_del(session);
742 }
743 
749 M0_INTERNAL struct m0_rpc_session *m0_rpc_conn_session0(const struct m0_rpc_conn
750  *conn)
751 {
752  struct m0_rpc_session *session0;
753 
755 
756  M0_ASSERT(session0 != NULL);
757  return session0;
758 }
759 
760 M0_INTERNAL struct m0_rpc_session *m0_rpc_session_search(const struct
761  m0_rpc_conn *conn,
762  uint64_t session_id)
763 {
764  M0_ENTRY("conn: %p, session_id: %llu", conn,
765  (unsigned long long) session_id);
766  M0_ASSERT(conn != NULL);
767 
768  return m0_tl_find(rpc_session, session, &conn->c_sessions,
769  session->s_session_id == session_id);
770 }
771 
773  const struct m0_rpc_conn *conn, uint64_t session_id)
774 {
775  struct m0_rpc_session *session;
776 
777  M0_ENTRY("conn: %p, session_id: %" PRIu64, conn, (uint64_t) session_id);
778  M0_PRE(conn != NULL);
780 
781  session = m0_tl_find(rpc_session, session, &conn->c_sessions,
782  session->s_session_id == session_id);
783  if (session != NULL)
785 
786  M0_LEAVE("session: %p", session);
787  return session;
788 }
789 
790 M0_INTERNAL struct m0_rpc_session *m0_rpc_session_pop(
791  const struct m0_rpc_conn *conn)
792 {
793  struct m0_rpc_session *session;
794 
795  M0_ENTRY("conn: %p", conn);
796  M0_PRE(conn != NULL);
798 
799  session = m0_tl_find(rpc_session, session, &conn->c_sessions,
801 
802  if (session != NULL)
804 
805  M0_LEAVE("session: %p", session);
806  return session;
807 }
808 
809 M0_INTERNAL int m0_rpc_conn_create(struct m0_rpc_conn *conn,
810  struct m0_fid *svc_fid,
811  struct m0_net_end_point *ep,
812  struct m0_rpc_machine *rpc_machine,
813  uint64_t max_rpcs_in_flight,
814  m0_time_t abs_timeout)
815 {
816  int rc;
817 
818  M0_ENTRY("conn: %p, svc_fid: %p, ep_addr: %s, "
819  "machine: %p max_rpcs_in_flight: %llu",
820  conn, svc_fid, (char *)ep->nep_addr, rpc_machine,
821  (unsigned long long)max_rpcs_in_flight);
822 
823  if (M0_FI_ENABLED("fake_error"))
824  return M0_RC(-EINVAL);
825 
826  rc = m0_rpc_conn_init(conn, svc_fid, ep, rpc_machine,
827  max_rpcs_in_flight);
828  if (rc == 0) {
829  rc = m0_rpc_conn_establish_sync(conn, abs_timeout);
830  if (rc != 0)
832  }
833  return M0_RC(rc);
834 }
835 
837  m0_time_t abs_timeout)
838 {
839  int rc;
840 
841  M0_ENTRY();
842 
843  rc = m0_rpc_conn_establish(conn, abs_timeout);
844  if (rc != 0)
845  return M0_RC(rc);
846 
849  M0_TIME_NEVER);
850 
851  M0_POST(M0_IN(conn_state(conn),
853  return M0_RC(rc);
854 }
855 M0_EXPORTED(m0_rpc_conn_establish_sync);
856 
857 M0_INTERNAL int m0_rpc_conn_establish(struct m0_rpc_conn *conn,
858  m0_time_t abs_timeout)
859 {
860  struct m0_fop *fop;
861  struct m0_rpc_session *session_0;
862  struct m0_rpc_machine *machine;
863  int rc;
864 
865  M0_ENTRY("conn: %p", conn);
866  M0_PRE(conn != NULL && conn->c_rpc_machine != NULL);
867 
868  if (M0_FI_ENABLED("fake_error"))
869  return M0_ERR(-EINVAL);
870 
872 
874  if (fop == NULL) {
876  conn_failed(conn, -ENOMEM);
878  return M0_ERR(-ENOMEM);
879  }
880 
882 
886 
887  /* m0_rpc_fop_conn_establish FOP doesn't contain any data. */
888 
889  session_0 = m0_rpc_conn_session0(conn);
890 
892  abs_timeout);
893  if (rc == 0)
895  /*
896  * It is possible that ->rio_replied() was called
897  * and connection is in failed state already.
898  */
901  m0_fop_put(fop);
902 
906 
908 
909  return M0_RC(rc);
910 }
911 M0_EXPORTED(m0_rpc_conn_establish);
912 
916 static void conn_failed(struct m0_rpc_conn *conn, int32_t error)
917 {
918  M0_ENTRY("conn: %p, error: %d", conn, error);
919 
921 
923  M0_LEAVE();
924 }
925 
927 {
929  struct m0_rpc_machine *machine;
930  struct m0_rpc_conn *conn;
931  struct m0_rpc_item *reply_item;
932  int32_t rc;
933 
934  M0_ENTRY("item: %p", item);
935  M0_PRE(item != NULL &&
936  item->ri_session != NULL &&
938 
939  conn = item2conn(item);
941 
946 
948  if (rc == 0) {
949  reply_item = item->ri_reply;
950  M0_ASSERT(reply_item != NULL &&
951  item->ri_session == reply_item->ri_session);
952  reply = m0_fop_data(m0_rpc_item_to_fop(reply_item));
953  rc = reply->rcer_rc;
954  }
955  if (rc == 0) {
956  M0_ASSERT(reply != NULL);
957  if (reply->rcer_sender_id != SENDER_ID_INVALID) {
958  conn->c_sender_id = reply->rcer_sender_id;
960  } else
961  rc = M0_ERR(-EPROTO);
962  }
963  if (rc != 0) {
964  M0_LOG(M0_DEBUG, "rpc item ERROR rc=%d", rc);
965  conn_failed(conn, rc);
966  }
967 
971  M0_LEAVE();
972 }
973 
975 {
976  int rc;
977 
978  M0_ENTRY("conn: %p", conn);
979 
981  rc = m0_rpc_conn_terminate_sync(conn, abs_timeout);
983 
984  return M0_RC(rc);
985 }
986 M0_EXPORTED(m0_rpc_conn_destroy);
987 
989  m0_time_t abs_timeout)
990 {
991  int rc;
992 
993  M0_ENTRY();
994 
995  rc = m0_rpc_conn_terminate(conn, abs_timeout);
996  if (rc != 0)
997  return M0_ERR(rc);
998 
1001  M0_TIME_NEVER);
1002 
1004  M0_RPC_CONN_FAILED)));
1005  return M0_RC(rc);
1006 }
1007 M0_EXPORTED(m0_rpc_conn_terminate_sync);
1008 
1009 M0_INTERNAL int m0_rpc_conn_terminate(struct m0_rpc_conn *conn,
1010  m0_time_t abs_timeout)
1011 {
1013  struct m0_rpc_session *session_0;
1014  struct m0_rpc_machine *machine;
1015  struct m0_fop *fop;
1016  int rc;
1017 
1018  M0_ENTRY("conn: %p", conn);
1019  M0_PRE(conn != NULL);
1021 
1023  if (M0_FI_ENABLED("fail_allocation"))
1024  fop = NULL;
1025  else
1027  machine);
1032 
1034 
1035  if (fop == NULL) {
1036  /* see note [^1] at the end of function */
1037  rc = -ENOMEM;
1038  conn_failed(conn, rc);
1040  return M0_ERR(rc);
1041  }
1043  m0_fop_put(fop);
1045  return M0_RC(0);
1046  }
1047  args = m0_fop_data(fop);
1048  args->ct_sender_id = conn->c_sender_id;
1049 
1051  /*
1052  * Unable to terminate normal way while having other side
1053  * dead. Therefore, fail itself and quit.
1054  */
1055  m0_fop_put(fop);
1056  rc = -ECANCELED;
1057  conn_failed(conn, rc);
1060  return M0_ERR_INFO(rc, "Connection is known to be dead:"
1061  " sender_id=%" PRIu64 " svc_fid="FID_F,
1063  }
1064 
1065  session_0 = m0_rpc_conn_session0(conn);
1066 
1067  /*
1068  * m0_rpc_conn_establish_reply_received() expects the session
1069  * to be in M0_RPC_CONN_TERMINATING state. Make sure it is so,
1070  * even if item send below fails.
1071  */
1074  abs_timeout);
1075  /*
1076  * It is possible that ->rio_replied() was called
1077  * and connection is terminated already.
1078  */
1079  if (rc != 0 && conn_state(conn) == M0_RPC_CONN_TERMINATING)
1080  conn_failed(conn, rc);
1081 
1082  m0_fop_put(fop);
1085  /*
1086  * CAUTION: Following assertion is not guaranteed as soon as
1087  * rpc_machine is unlocked.
1088  */
1090 
1092  /* see m0_rpc_conn_terminate_reply_received() */
1093  return M0_RC(rc);
1094 }
1095 M0_EXPORTED(m0_rpc_conn_terminate);
1096 /*
1097  * m0_rpc_conn_terminate [^1]
1098  * There are two choices here:
1099  *
1100  * 1. leave conn in TERMNATING state FOREVER.
1101  * Then when to fini/cleanup conn.
1102  *
1103  * 2. Move conn to FAILED state.
1104  * For this conn the receiver side state will still
1105  * continue to exist. And receiver can send one-way
1106  * items, that will be received on sender i.e. current node.
1107  * Current code will drop such items. When/how to fini and
1108  * cleanup receiver side state? XXX
1109  *
1110  * For now, later is chosen. This can be changed in future
1111  * to alternative 1, iff required.
1112  */
1113 
1115 {
1116  struct m0_rpc_item_source *source;
1117 
1119 
1120  m0_tl_teardown(item_source, &conn->c_item_sources, source) {
1121  source->ris_conn = NULL;
1122  source->ris_ops->riso_conn_terminating(source);
1123  }
1124 }
1125 
1127 {
1129  struct m0_rpc_conn *conn;
1130  struct m0_rpc_machine *machine;
1131  struct m0_rpc_item *reply_item;
1132  int32_t rc;
1133 
1134  M0_ENTRY("item: %p", item);
1135  M0_PRE(item != NULL &&
1136  item->ri_session != NULL &&
1138 
1139  conn = item2conn(item);
1144 
1146  if (rc == 0) {
1147  reply_item = item->ri_reply;
1148  M0_ASSERT(reply_item != NULL &&
1149  item->ri_session == reply_item->ri_session);
1150  reply = m0_fop_data(m0_rpc_item_to_fop(reply_item));
1151  rc = reply->ctr_rc;
1152  }
1153  if (rc == 0) {
1154  M0_ASSERT(reply != NULL);
1155  if (conn->c_sender_id == reply->ctr_sender_id)
1157  else
1158  rc = M0_ERR(-EPROTO);
1159  }
1160  if (rc != 0) {
1161  M0_LOG(M0_DEBUG, "rpc item ERROR rc=%d sender_id=%"PRIu64
1162  " svc_fid="FID_F, rc, conn->c_sender_id,
1163  FID_P(&conn->c_svc_fid));
1164  conn_failed(conn, rc);
1165  }
1166 
1169  M0_RPC_CONN_FAILED)));
1171  M0_LEAVE();
1172 }
1173 
1175 {
1177 }
1178 
1188 {
1189  struct m0_rpc_session *session;
1190 
1191  m0_tl_for(rpc_session, &conn->c_sessions, session) {
1193  continue;
1194  M0_LOG(M0_INFO, "Aborting session %llu",
1195  (unsigned long long)session->s_session_id);
1196  if (fail) {
1197  if (!M0_IN(session_state(session),
1202  -ECANCELED);
1206  } else { /* normal cleanup */
1209  M0_TIME_NEVER);
1211  }
1213  } m0_tl_endfor;
1214  M0_POST(rpc_session_tlist_length(&conn->c_sessions) == 1);
1215 }
1216 
1218 {
1219  M0_ENTRY("conn: %p", conn);
1220 
1225 
1226  if (conn->c_nr_sessions > 1)
1230 
1232  /* In-core state will be cleaned up by
1233  m0_rpc_conn_terminate_reply_sent() */
1234  return M0_RC(0);
1235 }
1236 
1238 {
1239  struct m0_rpc_session *session0;
1240 
1241  M0_ENTRY("conn: %p", conn);
1242  M0_ASSERT(conn != NULL);
1246  M0_RPC_CONN_FAILED)));
1247 
1251  m0_free(conn);
1252  } else {
1253  conn->c_ast_in_progress = false;
1254  }
1255  M0_LEAVE();
1256 }
1257 
1258 M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
1259 {
1260  return item->ri_type ==
1262 }
1263 
1264 M0_INTERNAL bool m0_rpc_item_is_sess_establish(const struct m0_rpc_item *item)
1265 {
1266  return item->ri_type ==
1268 }
1269 
1277  int dir)
1278 {
1279  struct m0_tl *list;
1280  struct m0_rpc_conn *conn;
1281 
1283 
1284  m0_tl_for(rpc_conn, list, conn) {
1285  M0_LOG(M0_DEBUG, "rmach %8p conn %8p id %llu state %x dir %s",
1286  machine, conn,
1287  (unsigned long long)conn->c_sender_id,
1288  conn_state(conn),
1289  (conn->c_flags & RCF_SENDER_END)? "S":"R");
1290  } m0_tl_endfor;
1291  return 0;
1292 }
1293 
1294 M0_INTERNAL int m0_rpc_conn_session_list_dump(const struct m0_rpc_conn *conn)
1295 {
1296  struct m0_rpc_session *session;
1297 
1298  m0_tl_for(rpc_session, &conn->c_sessions, session) {
1299  M0_LOG(M0_DEBUG, "session %p id %llu state %x", session,
1300  (unsigned long long)session->s_session_id,
1302  } m0_tl_endfor;
1303  return 0;
1304 }
1305 
1306 M0_INTERNAL const char *m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
1307 {
1308  return conn->c_rpcchan->rc_destep->nep_addr;
1309 }
1310 
1311 M0_INTERNAL bool m0_rpc_conn_is_known_dead(const struct m0_rpc_conn *conn)
1312 {
1313  struct m0_conf_obj *svc_obj;
1314 
1315  M0_PRE(conn != NULL);
1316  if (!m0_fid_is_set(&conn->c_svc_fid))
1317  return false;
1318  svc_obj = m0_conf_cache_lookup(&rpc_conn2confc(conn)->cc_cache,
1319  &conn->c_svc_fid);
1320  return svc_obj != NULL &&
1321  svc_obj->co_ha_state == M0_NC_FAILED;
1322 }
1323 
1325 {
1326  struct m0_rpc_session *session;
1327 
1328  m0_tl_for(rpc_session, &conn->c_sessions, session) {
1329  if (session->s_session_id == SESSION_ID_0 ||
1332  continue;
1334  } m0_tl_endfor;
1335 }
1336 
1343 {
1344  struct m0_rpc_conn *conn = container_of(clink, struct m0_rpc_conn,
1345  c_ha_clink);
1347  struct m0_conf_obj, co_ha_chan);
1348 
1350  M0_LOG(M0_DEBUG, "obj->co_ha_state = %d", obj->co_ha_state);
1351  /*
1352  * Ignore M0_NC_TRANSIENT state to keep items re-sending until service
1353  * gets M0_NC_ONLINE or becomes M0_NC_FAILED finally.
1354  */
1355  if (obj->co_ha_state == M0_NC_FAILED)
1364  return true;
1365 }
1366 
1368 {
1369  struct m0_rpc_conn *conn = container_of(clink, struct m0_rpc_conn,
1371  int rc;
1372 
1373  if (!m0_fid_is_set(&conn->c_svc_fid))
1374  return true;
1375  M0_LOG(M0_DEBUG, "subscribe %p conn to HA, svc_fid "FID_F, conn,
1376  FID_P(&conn->c_svc_fid));
1378  if (rc != 0)
1379  M0_LOG(M0_WARN, "Conn %p failed to subscribe, rc=%d", conn, rc);
1380  M0_POST(M0_IN(rc, (0, -ENOENT)));
1387  return true;
1388 }
1389 
1391 {
1392  struct m0_rpc_conn *conn = container_of(clink, struct m0_rpc_conn,
1395 
1396  if (!m0_fid_is_set(&conn->c_svc_fid) ||
1397  /*
1398  * @todo check if this can be avoided or if it is masking the
1399  * real problem here.
1400  */
1402  return true;
1403  M0_LOG(M0_DEBUG, "unsubscribe %p conn from HA", conn);
1404  /*
1405  * Lock ordering -
1406  * 1. Conf cache lock
1407  * 2. RPC machine lock
1408  */
1414  return true;
1415 }
1416 
1418 {
1419  M0_ENTRY("conn %p", conn);
1421  if (!m0_fid_is_set(&conn->c_svc_fid))
1422  return M0_RC(0); /* there's no point to arm the timer */
1424  return M0_RC(0); /* Already started */
1425  else
1428  return M0_RC(0);
1435  );
1436 }
1437 
1438 M0_INTERNAL void m0_rpc_conn_ha_timer_stop(struct m0_rpc_conn *conn)
1439 {
1442  M0_LOG(M0_DEBUG, "Cancelling HA timer; rpc conn=%p", conn);
1444  }
1445 }
1446 
1459 static void rpc_conn_ha_timer_cb(struct m0_sm_timer *timer)
1460 {
1461  struct m0_rpc_conn *conn;
1462  struct m0_conf_obj *svc_obj;
1463 
1464  M0_ENTRY();
1465  M0_PRE(timer != NULL);
1466 
1467  conn = container_of(timer, struct m0_rpc_conn, c_ha_timer);
1471  svc_obj = m0_rpc_conn2svc(conn);
1472  if (svc_obj != NULL && !conn_flag_is_set(conn, RCF_TRANSIENT_SENT) &&
1473  svc_obj->co_ha_state == M0_NC_ONLINE) {
1476  }
1477  M0_LEAVE();
1478 }
1479 
1480 static void reqh_service_ha_state_set(struct m0_rpc_conn *conn, uint8_t state)
1481 {
1482  struct m0_ha_msg *msg;
1483  uint64_t tag;
1484 
1485  M0_ENTRY("conn %p, svc_fid "FID_F", state %s", conn,
1486  FID_P(&conn->c_svc_fid), m0_ha_state2str(state));
1487 
1490  M0_PRE(M0_IN(state, (M0_NC_TRANSIENT, M0_NC_ONLINE)));
1492 
1493  conn->c_ha_attempts++;
1495  M0_LOG(M0_DEBUG, "Already reported about TRANSIENT");
1496  goto leave;
1497  }
1498 
1499  M0_ALLOC_PTR(msg);
1500  if (msg == NULL) {
1501  M0_LOG(M0_ERROR, "can't allocate memory for msg");
1502  goto leave;
1503  }
1504  *msg = (struct m0_ha_msg){
1505  .hm_fid = conn->c_svc_fid,
1506  .hm_time = m0_time_now(),
1507  .hm_data = {
1508  .hed_type = M0_HA_MSG_EVENT_RPC,
1509  .u.hed_event_rpc = {
1510  .hmr_state = state,
1511  .hmr_attempts = conn->c_ha_attempts
1512  },
1513  },
1514  };
1515  m0_ha_send(m0_get()->i_ha, m0_get()->i_ha_link, msg, &tag);
1516  m0_free(msg);
1518  M0_LOG(M0_DEBUG, "tag=%"PRIu64, tag);
1519 leave:
1520  M0_LEAVE();
1521 }
1522 
1523 M0_INTERNAL void m0_rpc_conn_ha_cfg_set(struct m0_rpc_conn *conn,
1524  const struct m0_rpc_conn_ha_cfg *cfg)
1525 {
1527  conn->c_ha_cfg = cfg;
1529 }
1530 
1531 #define S_CASE(x) case x: return #x;
1532 M0_INTERNAL const char *m0_rpc_conn_state_to_str(enum m0_rpc_conn_state state)
1533 {
1534  switch (state) {
1542  }
1543  M0_LOG(M0_ERROR, "State %d unknown", state);
1544  M0_IMPOSSIBLE("No transcript");
1545  return NULL;
1546 }
1547 #undef S_CASE
1548 
1549 #undef M0_TRACE_SUBSYSTEM
1550 
1552 /*
1553  * Local variables:
1554  * c-indentation-style: "K&R"
1555  * c-basic-offset: 8
1556  * tab-width: 8
1557  * fill-column: 80
1558  * scroll-step: 1
1559  * End:
1560  */
struct m0_fid co_id
Definition: obj.h:208
static struct m0_rpc_session session0
Definition: session.c:37
static struct ctx cc
M0_INTERNAL int m0_rpc_machine_conn_list_dump(struct m0_rpc_machine *machine, int dir)
Definition: conn.c:1276
M0_INTERNAL bool m0_rpc_conn_is_known_dead(const struct m0_rpc_conn *conn)
Definition: conn.c:1311
struct m0_uint128 uuid[1000]
Definition: uuid.c:73
#define M0_PRE(cond)
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
M0_INTERNAL bool m0_rpc_conn_is_rcv(const struct m0_rpc_conn *conn)
Definition: conn.c:288
M0_INTERNAL void m0_rpc_session_fini_locked(struct m0_rpc_session *session)
Definition: session.c:310
static struct m0_list list
Definition: list.c:144
struct m0_fid hm_fid
Definition: msg.h:117
uint64_t c_nr_sessions
Definition: conn.h:301
static const struct m0_sm_conf conn_conf
Definition: conn.c:161
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
#define ergo(a, b)
Definition: misc.h:293
static bool rpc_conn__on_service_event_cb(struct m0_clink *clink)
Definition: conn.c:1342
M0_INTERNAL void m0_uuid_generate(struct m0_uint128 *u)
Definition: uuid.c:44
Definition: sm.h:350
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
Definition: cache.c:106
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
const struct m0_conf_obj_type * m0_conf_fid_type(const struct m0_fid *fid)
Definition: obj.c:368
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
Definition: service.c:156
M0_INTERNAL int m0_rpc_conn_session_list_dump(const struct m0_rpc_conn *conn)
Definition: conn.c:1294
struct m0_tl rm_outgoing_conns
Definition: rpc_machine.h:95
uint64_t rs_nr_ha_timedout_items
Definition: rpc_machine.h:64
uint64_t rs_nr_ha_noted_conns
Definition: rpc_machine.h:65
static void leave(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
Definition: locality.c:315
M0_INTERNAL void m0_ha_send(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
Definition: ha.c:862
M0_INTERNAL void m0_conf_obj_put(struct m0_conf_obj *obj)
Definition: obj_ops.c:205
M0_INTERNAL void m0_rpc_conn_remove_session(struct m0_rpc_session *session)
Definition: conn.c:736
struct m0_chan rh_conf_cache_ready
Definition: reqh.h:205
struct m0_clink c_conf_exp_clink
Definition: conn.h:342
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
M0_INTERNAL struct m0_rpc_chan * rpc_chan_get(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_rpcs_in_flight)
Definition: rpc_machine.c:635
static int error
Definition: mdstore.c:64
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL struct m0 * m0_get(void)
Definition: instance.c:41
M0_INTERNAL int m0_rpc_rcv_conn_terminate(struct m0_rpc_conn *conn)
Definition: conn.c:1217
static bool conn_flag_is_set(const struct m0_rpc_conn *conn, uint64_t flag)
Definition: conn_internal.h:81
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
Definition: ub.c:49
static void rpc_conn_sessions_cleanup_fail(struct m0_rpc_conn *conn, bool fail)
Definition: conn.c:1187
#define TIME_P(t)
Definition: time.h:45
static const struct m0_rpc_item_ops conn_terminate_item_ops
Definition: conn.c:115
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_tl c_item_sources
Definition: conn.h:314
M0_INTERNAL void m0_rpc_conn_establish_reply_received(struct m0_rpc_item *item)
Definition: conn.c:926
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_pop(const struct m0_rpc_conn *conn)
Definition: conn.c:790
struct m0_rpc_chan * c_rpcchan
Definition: conn.h:317
M0_INTERNAL int m0_rpc_conn_timedwait(struct m0_rpc_conn *conn, uint64_t states, const m0_time_t timeout)
Definition: conn.c:700
static struct foo * obj
Definition: tlist.c:302
M0_INTERNAL bool m0_rpc_conn_invariant(const struct m0_rpc_conn *conn)
Definition: conn.c:183
M0_INTERNAL int m0_rpc_conn_terminate(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:1009
struct m0_clink c_conf_ready_clink
Definition: conn.h:345
M0_INTERNAL bool m0_rpc_session_invariant(const struct m0_rpc_session *session)
Definition: session.c:155
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
uint64_t s_xid
Definition: session.h:331
#define M0_CHECK_EX(cond)
struct m0_sm c_sm
Definition: conn.h:322
return M0_RC(rc)
Definition: sock.c:754
static int session_zero_attach(struct m0_rpc_conn *conn)
Definition: conn.c:470
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
Definition: conn.c:1306
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search(const struct m0_rpc_conn *conn, uint64_t session_id)
Definition: conn.c:760
#define M0_ENTRY(...)
Definition: trace.h:170
void(* cho_ha_timer_cb)(struct m0_sm_timer *timer)
Definition: conn_internal.h:54
M0_INTERNAL void m0_rpc_conn_add_session(struct m0_rpc_conn *conn, struct m0_rpc_session *session)
Definition: conn.c:722
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
#define TIME_F
Definition: time.h:44
static void __conn_fini(struct m0_rpc_conn *conn)
Definition: conn.c:500
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
Definition: conn.h:281
M0_INTERNAL void m0_conf_obj_get(struct m0_conf_obj *obj)
Definition: obj_ops.c:186
#define PRIu64
Definition: types.h:58
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
def args
Definition: addb2db.py:716
M0_INTERNAL int m0_rpc_conn_establish_sync(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:836
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
M0_INTERNAL int m0_rpc_conn_create(struct m0_rpc_conn *conn, struct m0_fid *svc_fid, struct m0_net_end_point *ep, struct m0_rpc_machine *rpc_machine, uint64_t max_rpcs_in_flight, m0_time_t abs_timeout)
Definition: conn.c:809
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
void(* mw_session_added)(struct m0_rpc_machine_watch *w, struct m0_rpc_session *session)
Definition: rpc_machine.h:251
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
M0_INTERNAL int m0_rpc_conn_ha_timer_start(struct m0_rpc_conn *conn)
Definition: conn.c:1417
M0_INTERNAL void session_state_set(struct m0_rpc_session *session, int state)
Definition: session.c:131
uint64_t c_sender_id
Definition: conn.h:269
Definition: trace.h:482
static struct m0_rpc_conn_ha_cfg rpc_conn_ha_cfg
Definition: conn.c:75
M0_INTERNAL struct m0_confc * m0_reqh2confc(struct m0_reqh *reqh)
Definition: reqh.c:753
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
M0_INTERNAL int m0_rpc_conn_ha_subscribe(struct m0_rpc_conn *conn, struct m0_fid *svc_fid)
Definition: conn.c:585
uint64_t c_flags
Definition: conn.h:275
M0_INTERNAL int m0_rpc__fop_post(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ops, m0_time_t abs_timeout)
Definition: session_utils.c:62
static int __conn_init(struct m0_rpc_conn *conn, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, uint64_t max_rpcs_in_flight)
Definition: conn.c:399
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_rpc_conn_ha_timer_stop(struct m0_rpc_conn *conn)
Definition: conn.c:1438
struct m0_tl rm_watch
Definition: rpc_machine.h:124
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
M0_INTERNAL const char * m0_rpc_conn_state_to_str(enum m0_rpc_conn_state state)
Definition: conn.c:1532
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
Definition: session.c:850
bool c_ast_in_progress
Definition: conn.h:333
Definition: tlist.h:251
uint64_t c_magic
Definition: conn.h:336
static bool rpc_conn__on_cache_expired_cb(struct m0_clink *clink)
Definition: conn.c:1390
int m0_rpc_conn_destroy(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:974
struct m0_conf_cache cc_cache
Definition: confc.h:394
M0_INTERNAL int m0_rpc_rcv_conn_init(struct m0_rpc_conn *conn, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, const struct m0_uint128 *uuid)
Definition: conn.c:525
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
Definition: session.c:141
struct m0_net_end_point * rc_destep
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
struct m0_rpc_item * ri_reply
Definition: item.h:163
static struct m0_sm_state_descr conn_states[]
Definition: conn.c:119
Definition: msg.h:115
#define M0_POST(cond)
struct m0_chan co_ha_chan
Definition: obj.h:248
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
Definition: chan.c:310
int32_t sm_rc
Definition: sm.h:336
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
Definition: conn.c:1258
M0_INTERNAL int m0_rpc_conn_terminate_sync(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:988
struct m0_rpc_conn conn
Definition: fsync.c:96
#define M0_CONF_CAST(ptr, type)
Definition: obj.h:780
M0_INTERNAL bool m0_confc_is_inited(const struct m0_confc *confc)
Definition: confc.c:448
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_tl rm_incoming_conns
Definition: rpc_machine.h:94
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
struct m0_rpc_machine machine
Definition: mdstore.c:58
#define FID_P(f)
Definition: fid.h:77
struct m0_sm s_sm
Definition: session.h:325
M0_INTERNAL void m0_rpc_conn_cleanup_all_sessions(struct m0_rpc_conn *conn)
Definition: conn.c:1174
static const struct m0_rpc_item_ops conn_establish_item_ops
Definition: conn.c:111
static void deregister_all_item_sources(struct m0_rpc_conn *conn)
Definition: conn.c:1114
struct m0_uint128 c_uuid
Definition: conn.h:272
static uint32_t timeout
Definition: console.c:52
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_tl c_sessions
Definition: conn.h:308
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL void m0_rpc_conn_terminate_reply_sent(struct m0_rpc_conn *conn)
Definition: conn.c:1237
M0_INTERNAL void m0_rpc_conn_fini_locked(struct m0_rpc_conn *conn)
Definition: conn.c:659
m0_time_t rchc_ha_interval
Definition: conn_internal.h:63
M0_INTERNAL void conn_state_set(struct m0_rpc_conn *conn, int state)
Definition: conn.c:167
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
M0_INTERNAL void m0_rpc_session_quiesce(struct m0_rpc_session *session)
Definition: session.c:836
struct m0_rpc_conn_ha_ops rchc_ops
Definition: conn_internal.h:62
const char * sd_name
Definition: sm.h:383
struct m0_fop_type m0_rpc_fop_conn_terminate_fopt
Definition: session_fops.c:122
struct m0_fop_type m0_rpc_fop_session_establish_fopt
Definition: session_fops.c:124
char * ep
Definition: sw.h:132
M0_INTERNAL void m0_rpc_conn_ha_cfg_set(struct m0_rpc_conn *conn, const struct m0_rpc_conn_ha_cfg *cfg)
Definition: conn.c:1523
struct m0_fid c_svc_fid
Definition: conn.h:350
static struct m0_chan chan[RDWR_REQUEST_MAX]
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL bool m0_tlist_invariant(const struct m0_tl_descr *d, const struct m0_tl *list)
Definition: tlist.c:236
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
Definition: sm.c:610
Definition: fid.h:38
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
Definition: conn_internal.h:58
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
struct m0_chan rh_conf_cache_exp
Definition: reqh.h:194
M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan)
Definition: rpc_machine.c:729
uint64_t c_ha_attempts
Definition: conn.h:284
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_INTERNAL int m0_rpc_session_init_locked(struct m0_rpc_session *session, struct m0_rpc_conn *conn)
Definition: session.c:223
struct m0_sm_timer c_ha_timer
Definition: conn.h:291
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_INTERNAL void m0_rpc_conn_terminate_reply_received(struct m0_rpc_item *item)
Definition: conn.c:1126
M0_INTERNAL int m0_rpc_conn_establish(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:857
static void __conn_ha_unsubscribe(struct m0_rpc_conn *conn)
Definition: conn.c:385
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static void __conn_ha_subscribe(struct m0_rpc_conn *conn)
Definition: conn.c:365
struct m0_clink c_ha_clink
Definition: conn.h:339
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
Definition: conn.c:349
M0_INTERNAL struct m0_rpc_session * m0_rpc_conn_session0(const struct m0_rpc_conn *conn)
Definition: conn.c:749
static void reqh_service_ha_state_set(struct m0_rpc_conn *conn, uint8_t state)
Definition: conn.c:1480
struct m0_rpc_item_type ft_rpc_item_type
Definition: fop.h:235
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL int m0_rpc_conn_init(struct m0_rpc_conn *conn, struct m0_fid *svc_fid, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, uint64_t max_rpcs_in_flight)
Definition: conn.c:293
#define M0_ASSERT_INFO(cond, fmt,...)
static void conn_flag_set(struct m0_rpc_conn *conn, uint64_t flag)
Definition: conn_internal.h:71
static bool rpc_conn__on_cache_ready_cb(struct m0_clink *clink)
Definition: conn.c:1367
M0_INTERNAL void m0_rpc_conn_ha_unsubscribe(struct m0_rpc_conn *conn)
Definition: conn.c:632
M0_INTERNAL void m0_rpc_machine_add_conn(struct m0_rpc_machine *rmach, struct m0_rpc_conn *conn)
Definition: rpc_machine.c:608
struct inode * dir
Definition: dir.c:1028
static void conn_failed(struct m0_rpc_conn *conn, int32_t error)
Definition: conn.c:916
M0_INTERNAL int m0_rpc_rcv_session_terminate(struct m0_rpc_session *session)
Definition: session.c:822
M0_INTERNAL const char * m0_ha_state2str(enum m0_ha_obj_state state)
Definition: note.c:433
struct m0_rpc_stats rm_stats
Definition: rpc_machine.h:96
#define M0_FID0
Definition: fid.h:93
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
Definition: sm.c:566
M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
Definition: conn.c:280
M0_INTERNAL void m0_rpc_conn_sessions_cancel(struct m0_rpc_conn *conn)
Definition: conn.c:1324
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
Definition: sm.c:559
M0_INTERNAL void m0_conf_cache_lock(struct m0_conf_cache *cache)
Definition: cache.c:50
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
uint64_t s_session_id
Definition: session.h:309
Definition: net.c:93
M0_INTERNAL bool m0_rpc_item_is_sess_establish(const struct m0_rpc_item *item)
Definition: conn.c:1264
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
void m0_fop_put(struct m0_fop *fop)
Definition: fop.c:177
M0_INTERNAL bool m0_conf_service_ep_is_known(const struct m0_conf_obj *svc_obj, const char *ep_addr)
Definition: helpers.c:197
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL void m0_rpc_conn_reset(struct m0_rpc_conn *conn)
Definition: conn.c:452
struct m0_fop_type m0_rpc_fop_conn_establish_fopt
Definition: session_fops.c:120
static struct m0_rpc_machine rpc_machine
Definition: service_ut.c:63
int32_t rc
Definition: trigger_fop.h:47
static void rpc_conn_ha_timer_cb(struct m0_sm_timer *timer)
Definition: conn.c:1459
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
Definition: sm.c:577
M0_INTERNAL void m0_rpc_conn_fini(struct m0_rpc_conn *conn)
Definition: conn.c:558
static struct m0_sm_state_descr states[C_NR]
Definition: sm.c:512
struct m0_rpc_conn * s_conn
Definition: session.h:312
M0_INTERNAL void m0_conf_cache_unlock(struct m0_conf_cache *cache)
Definition: cache.c:55
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search_and_pop(const struct m0_rpc_conn *conn, uint64_t session_id)
Definition: conn.c:772
struct m0_reqh * rm_reqh
Definition: rpc_machine.h:105
Definition: fop.h:79
m0_rpc_conn_state
Definition: conn.h:50
static void session_zero_detach(struct m0_rpc_conn *conn)
Definition: conn.c:682
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
Definition: rpc_internal.h:95
#define FID_F
Definition: fid.h:75
Definition: trace.h:478
static int conn_state(const struct m0_rpc_conn *conn)
Definition: conn_internal.h:66
#define S_CASE(x)
Definition: conn.c:1531
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
static struct m0_confc * rpc_conn2confc(const struct m0_rpc_conn *conn)
Definition: conn.c:343