23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 54 uint64_t max_rpcs_in_flight);
96 uint64_t max_rpcs_in_flight);
122 .sd_name =
"Initialised",
129 .sd_name =
"Connecting",
140 .sd_name =
"Terminating",
145 .sd_name =
"Terminated",
157 .sd_name =
"Finalised",
186 struct m0_tl *conn_list;
198 conn_list = sender_end ?
204 ok =
_0C(sender_end != recv_end) &&
205 _0C(rpc_conn_tlist_contains(conn_list,
conn)) &&
297 uint64_t max_rpcs_in_flight)
402 uint64_t max_rpcs_in_flight)
414 return M0_RC(-ENOMEM);
442 rpc_conn_tlink_init(
conn);
518 rpc_conn_tlink_fini(
conn);
669 rpc_conn_tlist_del(
conn);
740 rpc_session_tlist_del(
session);
765 (
unsigned long long) session_id);
813 uint64_t max_rpcs_in_flight,
818 M0_ENTRY(
"conn: %p, svc_fid: %p, ep_addr: %s, " 819 "machine: %p max_rpcs_in_flight: %llu",
821 (
unsigned long long)max_rpcs_in_flight);
824 return M0_RC(-EINVAL);
1320 return svc_obj !=
NULL &&
1509 .u.hed_event_rpc = {
1531 #define S_CASE(x) case x: return #x; 1549 #undef M0_TRACE_SUBSYSTEM
static struct m0_rpc_session session0
M0_INTERNAL int m0_rpc_machine_conn_list_dump(struct m0_rpc_machine *machine, int dir)
M0_INTERNAL bool m0_rpc_conn_is_known_dead(const struct m0_rpc_conn *conn)
struct m0_uint128 uuid[1000]
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
M0_INTERNAL bool m0_rpc_conn_is_rcv(const struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_session_fini_locked(struct m0_rpc_session *session)
static struct m0_list list
static const struct m0_sm_conf conn_conf
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
static bool rpc_conn__on_service_event_cb(struct m0_clink *clink)
M0_INTERNAL void m0_uuid_generate(struct m0_uint128 *u)
const m0_time_t M0_TIME_NEVER
static uint64_t tag(uint8_t code, uint64_t id)
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
#define M0_LOG(level,...)
const struct m0_conf_obj_type * m0_conf_fid_type(const struct m0_fid *fid)
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
M0_INTERNAL int m0_rpc_conn_session_list_dump(const struct m0_rpc_conn *conn)
struct m0_tl rm_outgoing_conns
uint64_t rs_nr_ha_timedout_items
uint64_t rs_nr_ha_noted_conns
static void leave(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
M0_INTERNAL void m0_ha_send(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
M0_INTERNAL void m0_conf_obj_put(struct m0_conf_obj *obj)
M0_INTERNAL void m0_rpc_conn_remove_session(struct m0_rpc_session *session)
struct m0_chan rh_conf_cache_ready
struct m0_clink c_conf_exp_clink
void * m0_fop_data(const struct m0_fop *fop)
M0_INTERNAL struct m0_rpc_chan * rpc_chan_get(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_rpcs_in_flight)
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
M0_INTERNAL struct m0 * m0_get(void)
M0_INTERNAL int m0_rpc_rcv_conn_terminate(struct m0_rpc_conn *conn)
static bool conn_flag_is_set(const struct m0_rpc_conn *conn, uint64_t flag)
#define container_of(ptr, type, member)
static void rpc_conn_sessions_cleanup_fail(struct m0_rpc_conn *conn, bool fail)
static const struct m0_rpc_item_ops conn_terminate_item_ops
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
static struct m0_rpc_item * item
struct m0_tl c_item_sources
M0_INTERNAL void m0_rpc_conn_establish_reply_received(struct m0_rpc_item *item)
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_pop(const struct m0_rpc_conn *conn)
struct m0_rpc_chan * c_rpcchan
M0_INTERNAL int m0_rpc_conn_timedwait(struct m0_rpc_conn *conn, uint64_t states, const m0_time_t timeout)
M0_INTERNAL bool m0_rpc_conn_invariant(const struct m0_rpc_conn *conn)
M0_INTERNAL int m0_rpc_conn_terminate(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
struct m0_clink c_conf_ready_clink
M0_INTERNAL bool m0_rpc_session_invariant(const struct m0_rpc_session *session)
struct m0_sm_group rm_sm_grp
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
#define M0_CHECK_EX(cond)
static int session_zero_attach(struct m0_rpc_conn *conn)
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search(const struct m0_rpc_conn *conn, uint64_t session_id)
void(* cho_ha_timer_cb)(struct m0_sm_timer *timer)
M0_INTERNAL void m0_rpc_conn_add_session(struct m0_rpc_conn *conn, struct m0_rpc_session *session)
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
static void __conn_fini(struct m0_rpc_conn *conn)
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
M0_INTERNAL void m0_conf_obj_get(struct m0_conf_obj *obj)
struct m0_rpc_machine * c_rpc_machine
M0_INTERNAL int m0_rpc_conn_establish_sync(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
M0_INTERNAL int m0_rpc_conn_create(struct m0_rpc_conn *conn, struct m0_fid *svc_fid, struct m0_net_end_point *ep, struct m0_rpc_machine *rpc_machine, uint64_t max_rpcs_in_flight, m0_time_t abs_timeout)
#define M0_ERR_INFO(rc, fmt,...)
void(* mw_session_added)(struct m0_rpc_machine_watch *w, struct m0_rpc_session *session)
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
M0_INTERNAL int m0_rpc_conn_ha_timer_start(struct m0_rpc_conn *conn)
M0_INTERNAL void session_state_set(struct m0_rpc_session *session, int state)
static struct m0_rpc_conn_ha_cfg rpc_conn_ha_cfg
M0_INTERNAL struct m0_confc * m0_reqh2confc(struct m0_reqh *reqh)
#define m0_tl_teardown(name, head, obj)
M0_INTERNAL int m0_rpc_conn_ha_subscribe(struct m0_rpc_conn *conn, struct m0_fid *svc_fid)
M0_INTERNAL int m0_rpc__fop_post(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ops, m0_time_t abs_timeout)
static int __conn_init(struct m0_rpc_conn *conn, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, uint64_t max_rpcs_in_flight)
M0_INTERNAL void m0_rpc_conn_ha_timer_stop(struct m0_rpc_conn *conn)
m0_time_t m0_time_now(void)
void m0_sm_state_set(struct m0_sm *mach, int state)
M0_INTERNAL const char * m0_rpc_conn_state_to_str(enum m0_rpc_conn_state state)
enum m0_ha_obj_state co_ha_state
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
static bool rpc_conn__on_cache_expired_cb(struct m0_clink *clink)
int m0_rpc_conn_destroy(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
struct m0_conf_cache cc_cache
M0_INTERNAL int m0_rpc_rcv_conn_init(struct m0_rpc_conn *conn, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, const struct m0_uint128 *uuid)
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
struct m0_net_end_point * rc_destep
const struct m0_rpc_item_type * ri_type
struct m0_rpc_item * ri_reply
static struct m0_sm_state_descr conn_states[]
struct m0_chan co_ha_chan
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_conn_terminate_sync(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
#define M0_CONF_CAST(ptr, type)
M0_INTERNAL bool m0_confc_is_inited(const struct m0_confc *confc)
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_tl rm_incoming_conns
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
struct m0_rpc_machine machine
M0_INTERNAL void m0_rpc_conn_cleanup_all_sessions(struct m0_rpc_conn *conn)
static const struct m0_rpc_item_ops conn_establish_item_ops
static void deregister_all_item_sources(struct m0_rpc_conn *conn)
void(* rio_replied)(struct m0_rpc_item *item)
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL void m0_rpc_conn_terminate_reply_sent(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_conn_fini_locked(struct m0_rpc_conn *conn)
m0_time_t rchc_ha_interval
M0_INTERNAL void conn_state_set(struct m0_rpc_conn *conn, int state)
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
M0_INTERNAL void m0_rpc_session_quiesce(struct m0_rpc_session *session)
struct m0_rpc_conn_ha_ops rchc_ops
struct m0_fop_type m0_rpc_fop_conn_terminate_fopt
struct m0_fop_type m0_rpc_fop_session_establish_fopt
M0_INTERNAL void m0_rpc_conn_ha_cfg_set(struct m0_rpc_conn *conn, const struct m0_rpc_conn_ha_cfg *cfg)
static struct m0_chan chan[RDWR_REQUEST_MAX]
#define M0_FI_ENABLED(tag)
M0_INTERNAL bool m0_tlist_invariant(const struct m0_tl_descr *d, const struct m0_tl *list)
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
struct m0_chan rh_conf_cache_exp
M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL int m0_rpc_session_init_locked(struct m0_rpc_session *session, struct m0_rpc_conn *conn)
struct m0_sm_timer c_ha_timer
struct m0_rpc_session * ri_session
M0_INTERNAL void m0_rpc_conn_terminate_reply_received(struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_conn_establish(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
static void __conn_ha_unsubscribe(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
static void __conn_ha_subscribe(struct m0_rpc_conn *conn)
struct m0_clink c_ha_clink
static struct m0_fop * fop
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
M0_INTERNAL struct m0_rpc_session * m0_rpc_conn_session0(const struct m0_rpc_conn *conn)
static void reqh_service_ha_state_set(struct m0_rpc_conn *conn, uint8_t state)
struct m0_rpc_item_type ft_rpc_item_type
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_conn_init(struct m0_rpc_conn *conn, struct m0_fid *svc_fid, struct m0_net_end_point *ep, struct m0_rpc_machine *machine, uint64_t max_rpcs_in_flight)
#define M0_ASSERT_INFO(cond, fmt,...)
static void conn_flag_set(struct m0_rpc_conn *conn, uint64_t flag)
static bool rpc_conn__on_cache_ready_cb(struct m0_clink *clink)
M0_INTERNAL void m0_rpc_conn_ha_unsubscribe(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_machine_add_conn(struct m0_rpc_machine *rmach, struct m0_rpc_conn *conn)
static void conn_failed(struct m0_rpc_conn *conn, int32_t error)
M0_INTERNAL int m0_rpc_rcv_session_terminate(struct m0_rpc_session *session)
M0_INTERNAL const char * m0_ha_state2str(enum m0_ha_obj_state state)
struct m0_rpc_stats rm_stats
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_conn_sessions_cancel(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
M0_INTERNAL void m0_conf_cache_lock(struct m0_conf_cache *cache)
static struct m0_dtm_oper_descr reply
#define m0_tl_find(name, var, head,...)
M0_INTERNAL bool m0_rpc_item_is_sess_establish(const struct m0_rpc_item *item)
#define m0_tl_for(name, head, obj)
static struct m0_addb2_source * s
void m0_fop_put(struct m0_fop *fop)
M0_INTERNAL bool m0_conf_service_ep_is_known(const struct m0_conf_obj *svc_obj, const char *ep_addr)
M0_INTERNAL void m0_rpc_conn_reset(struct m0_rpc_conn *conn)
struct m0_fop_type m0_rpc_fop_conn_establish_fopt
static struct m0_rpc_machine rpc_machine
static void rpc_conn_ha_timer_cb(struct m0_sm_timer *timer)
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
M0_INTERNAL void m0_rpc_conn_fini(struct m0_rpc_conn *conn)
static struct m0_sm_state_descr states[C_NR]
struct m0_rpc_conn * s_conn
M0_INTERNAL void m0_conf_cache_unlock(struct m0_conf_cache *cache)
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search_and_pop(const struct m0_rpc_conn *conn, uint64_t session_id)
static void session_zero_detach(struct m0_rpc_conn *conn)
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
static int conn_state(const struct m0_rpc_conn *conn)
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
#define m0_tl_forall(name, var, head,...)
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
static struct m0_confc * rpc_conn2confc(const struct m0_rpc_conn *conn)