23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 92 .
bt_name =
"m0_reqh_service_ctx",
103 .sd_name =
"Initializing",
107 .sd_name =
"Initialized",
111 .sd_name =
"Starting",
115 .sd_name =
"Started",
119 .sd_name =
"Stopping",
124 .sd_name =
"Stopped",
145 return _0C(m0_reqh_service_bob_check(
svc)) &&
151 (
svc->rs_ops->rso_start_async !=
NULL ||
152 svc->rs_ops->rso_start !=
NULL)) &&
159 m0_reqh_svc_tlist_contains(&
svc->rs_reqh->rh_services,
svc))) &&
161 M0_IN(m0_reqh_lockers_get(
svc->rs_reqh,
svc->rs_type->rst_key),
199 m0_reqh_service_bob_init(
service);
223 "service_fid="FID_F, state2event[state],
266 if (!m0_reqh_lockers_is_empty(
reqh,
key))
267 m0_reqh_lockers_clear(
reqh,
key);
378 bool run_method =
false;
425 m0_reqh_lockers_clear(
reqh,
key);
463 m0_reqh_svc_tlink_del_fini(
service);
464 m0_reqh_service_bob_fini(
service);
481 m0_reqh_service_type_bob_init(rstype);
483 rstype->
rst_key = m0_reqh_lockers_allot();
484 rstypes_tlink_init_at_tail(rstype, &
rstypes);
492 M0_PRE(rstype !=
NULL && m0_reqh_service_type_bob_check(rstype));
494 rstypes_tlink_del_fini(rstype);
495 m0_reqh_lockers_free(rstype->
rst_key);
496 m0_reqh_service_type_bob_fini(rstype);
501 return rstypes_tlist_length(&
rstypes);
562 return s->rs_sm.sm_state;
585 return M0_RC(result);
657 .sd_name =
"M0_RSC_OFFLINE",
661 .sd_name =
"M0_RSC_ONLINE",
666 .sd_name =
"M0_RSC_CONNECTING",
671 .sd_name =
"M0_RSC_DISCONNECTING",
675 .sd_name =
"M0_RSC_CANCELLED",
681 .
scf_name =
"Service ctx connection states",
704 if (
ctx->sc_svc_event.cl_chan !=
NULL) {
706 ctx->sc_svc_event.cl_chan =
NULL;
710 if (
ctx->sc_process_event.cl_chan !=
NULL) {
712 ctx->sc_process_event.cl_chan =
NULL;
727 &
ctx->sc_rlink_wait);
734 M0_ENTRY(
"ctx=%p '%s' Connect to service '%s' type=%s",
ctx,
778 &
ctx->sc_rlink_wait);
783 M0_ENTRY(
"Disconnecting from service '%s'",
816 rc =
ctx->sc_rlink.rlk_rc;
854 M0_ENTRY(
"Reconnecting to service '%s'",
958 &
ctx->sc_rlink_abort);
986 m0_reqh_service_ctx_bob_fini(
ctx);
1001 M0_ENTRY(
"'%s' ctx of service '%s' state: %d",
1106 switch (
obj->co_ha_state) {
1176 switch (
obj->co_ha_state) {
1213 uint32_t max_rpc_nr_in_flight)
1222 if (rmach !=
NULL) {
1224 addr, max_rpc_nr_in_flight);
1230 ctx->sc_service = svc_obj;
1231 ctx->sc_process = proc_obj;
1233 ctx->sc_fid_process = proc_obj->
co_id;
1234 m0_reqh_service_ctx_bob_init(
ctx);
1243 ctx->sc_rlink_wait.cl_is_oneshot =
true;
1253 uint32_t max_rpc_nr_in_flight,
1266 max_rpc_nr_in_flight);
1289 ret =
M0_AMB(ret, rlink, sc_rlink);
1317 M0_PRE(M0_IN(
ctx->sc_rlink.rlk_conn.c_sm.sm_state,
1322 pools_common_svc_ctx_tlink_del_fini(
ctx);
1338 #undef M0_TRACE_SUBSYSTEM const struct m0_conf_obj_type * m0_conf_obj_type(const struct m0_conf_obj *obj)
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
static bool process_event_handler(struct m0_clink *clink)
static void reqh_service_ctx_state_move(struct m0_reqh_service_ctx *ctx, int state)
M0_INTERNAL uint64_t m0_process(void)
M0_INTERNAL int m0_reqh_service_start_async(struct m0_reqh_service_start_async_ctx *asc)
M0_INTERNAL void m0_reqh_service_cancel_reconnect(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL const char * m0_rpc_link_end_point(const struct m0_rpc_link *rlink)
static void abandoned_ctx_destroy_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
M0_INTERNAL bool m0_buf_is_set(const struct m0_buf *buf)
M0_INTERNAL int m0_reqh_service_start(struct m0_reqh_service *service)
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_lookup(const struct m0_reqh *reqh, const struct m0_fid *fid)
M0_INTERNAL int m0_reqh_service_types_init(void)
int(* rso_start)(struct m0_reqh_service *service)
M0_TL_DECLARE(abandoned_svc_ctxs, M0_EXTERN, struct m0_reqh_service_ctx)
M0_INTERNAL struct m0_locality * m0_locality_get(uint64_t value)
static void reqh_service_ctx_flag_clear(struct m0_reqh_service_ctx *ctx, int flag)
M0_INTERNAL void m0_reqh_service_stop(struct m0_reqh_service *service)
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_conf_ha_service_event_post(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_fid *source_process_fid, const struct m0_fid *source_service_fid, const struct m0_fid *service_fid, uint64_t pid, enum m0_conf_ha_service_event event, enum m0_conf_service_type service_type)
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
enum m0_conf_service_type rst_typecode
const m0_time_t M0_TIME_NEVER
M0_INTERNAL int m0_rpc_link_init(struct m0_rpc_link *rlink, struct m0_rpc_machine *mach, struct m0_fid *svc_fid, const char *ep, uint64_t max_rpcs_in_flight)
static struct m0_sm_group * grp
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
#define M0_LOG(level,...)
M0_INTERNAL void m0_reqh_service_prepare_to_stop(struct m0_reqh_service *service)
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
void m0_console_printf(const char *fmt,...)
static void reqh_service_state_set(struct m0_reqh_service *service, enum m0_reqh_service_state state)
M0_INTERNAL void m0_reqh_service_ctx_subscribe(struct m0_reqh_service_ctx *ctx)
void(* rso_fini)(struct m0_reqh_service *service)
static bool reqh_service_ctx_is_cancelled(struct m0_reqh_service_ctx *ctx)
#define m0_strcaseeq(a, b)
M0_INTERNAL bool m0_reqh_service_ctx_is_connected(const struct m0_reqh_service_ctx *ctx)
static void reqh_service_disconnect_locked(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_rwlock_write_lock(struct m0_rwlock *lock)
static void reqh_service_connect_locked(struct m0_reqh_service_ctx *ctx, m0_time_t deadline)
M0_INTERNAL void m0_reqh_service_ctx_destroy(struct m0_reqh_service_ctx *ctx)
static bool reqh_service_ctx_sm_is_locked(const struct m0_reqh_service_ctx *ctx)
M0_TL_DESCR_DECLARE(abandoned_svc_ctxs, M0_EXTERN)
M0_INTERNAL void m0_rpc_link_fini(struct m0_rpc_link *rlink)
static void reqh_service_reconnect_locked(struct m0_reqh_service_ctx *ctx, const char *addr)
static void reqh_service_ctx_sm_unlock(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL struct m0 * m0_get(void)
M0_TL_DEFINE(rstypes, static, struct m0_reqh_service_type)
M0_INTERNAL m0_time_t m0_rpc__down_timeout(void)
M0_INTERNAL void m0_reqh_service_connect_wait(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
struct m0_rwlock rh_rwlock
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
static bool service_event_handler(struct m0_clink *clink)
M0_INTERNAL int m0_reqh_service_types_length(void)
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
M0_TL_DESCR_DEFINE(rstypes, "reqh service types", static, struct m0_reqh_service_type, rst_linkage, rst_magix, M0_REQH_SVC_TYPE_MAGIC, M0_REQH_SVC_HEAD_MAGIC)
struct m0_reqh_service * sac_service
M0_INTERNAL struct m0_reqh_service_ctx * m0_pools_common_service_ctx_find(const struct m0_pools_common *pc, const struct m0_fid *id, enum m0_conf_service_type type)
static struct m0_pools_common pc
M0_INTERNAL struct m0_reqh_service_type * m0_reqh_service_type_find(const char *sname)
M0_INTERNAL void m0_rwlock_init(struct m0_rwlock *lock)
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
static void reqh_service_started_common(struct m0_reqh *reqh, struct m0_reqh_service *service)
struct m0_buf rs_ss_param
struct m0_reqh_context rctx
static struct m0_sm_ast ast[NR]
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
M0_INTERNAL bool m0_reqh_service_is_registered(const char *sname)
static void reqh_service_ha_event(struct m0_reqh_service *service, enum m0_reqh_service_state state)
M0_INTERNAL void m0_conf_obj_get(struct m0_conf_obj *obj)
M0_INTERNAL int m0_reqh_service_disconnect_wait(struct m0_reqh_service_ctx *ctx)
void(* rso_stop)(struct m0_reqh_service *service)
return M0_ERR(-EOPNOTSUPP)
static struct m0_sm_state_descr service_states[]
#define M0_AMB(obj, ptr, field)
M0_INTERNAL bool m0_fom_domain_is_idle_for(const struct m0_reqh_service *svc)
static const struct socktype stype[]
static void reqh_service_starting_common(struct m0_reqh *reqh, struct m0_reqh_service *service, unsigned key)
M0_INTERNAL int m0_reqh_service_ctx_create(struct m0_conf_obj *svc_obj, enum m0_conf_service_type stype, struct m0_rpc_machine *rmach, const char *addr, uint32_t max_rpc_nr_in_flight, struct m0_reqh_service_ctx **out)
static void reqh_service_ctx_flag_set(struct m0_reqh_service_ctx *ctx, int flag)
M0_INTERNAL void m0_reqh_service_connect(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_reqh_service_quit(struct m0_reqh_service *svc)
M0_INTERNAL const char * m0_conf_service_type2str(enum m0_conf_service_type type)
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
static bool reqh_service_ctx_flag_is_set(const struct m0_reqh_service_ctx *ctx, int flag)
void m0_sm_state_set(struct m0_sm *mach, int state)
struct m0_conf_obj * m0_conf_obj_grandparent(const struct m0_conf_obj *obj)
M0_INTERNAL void m0_reqh_service_fini(struct m0_reqh_service *service)
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
struct m0_fid rs_service_fid
static struct m0_thread t[8]
struct m0_reqh_context * rs_reqh_ctx
M0_INTERNAL bool m0_rpc_link_is_connected(const struct m0_rpc_link *rlink)
M0_INTERNAL bool m0_reqh_service_invariant(const struct m0_reqh_service *svc)
M0_INTERNAL struct m0_reqh * m0_conf_obj2reqh(const struct m0_conf_obj *obj)
M0_INTERNAL void m0_rpc_link_reset(struct m0_rpc_link *rlink)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
M0_INTERNAL bool m0_conf_cache_is_locked(const struct m0_conf_cache *cache)
static struct m0_sm_state_descr service_ctx_states[]
struct m0_chan co_ha_chan
M0_INTERNAL int m0_reqh_service_allocate(struct m0_reqh_service **out, const struct m0_reqh_service_type *stype, struct m0_reqh_context *rctx)
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
M0_INTERNAL int m0_reqh_service_ctx_init(struct m0_reqh_service_ctx *ctx, struct m0_conf_obj *svc_obj, enum m0_conf_service_type stype, struct m0_rpc_machine *rmach, const char *addr, uint32_t max_rpc_nr_in_flight)
struct m0_sm_group * lo_grp
M0_INTERNAL void m0_reqh_service_ctx_unsubscribe(struct m0_reqh_service_ctx *ctx)
static bool reqh_service_context_invariant(const struct m0_reqh_service_ctx *ctx)
static struct m0_tl rstypes
static bool reqh_service_ctx_rlink_cb(struct m0_clink *clink)
M0_INTERNAL void m0_rwlock_write_unlock(struct m0_rwlock *lock)
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
M0_INTERNAL void m0_reqh_service_init(struct m0_reqh_service *service, struct m0_reqh *reqh, const struct m0_fid *fid)
#define M0_CONF_CAST(ptr, type)
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_reqh_service_setup(struct m0_reqh_service **out, struct m0_reqh_service_type *stype, struct m0_reqh *reqh, struct m0_reqh_context *rctx, const struct m0_fid *fid)
static void reqh_service_failed_common(struct m0_reqh *reqh, struct m0_reqh_service *service, unsigned key)
static struct fdmi_ctx ctx
static const struct m0_sm_conf service_ctx_states_conf
static const struct m0_bob_type reqh_svc_ctx
int m0_reqh_service_async_start_simple(struct m0_reqh_service_start_async_ctx *asc)
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
struct m0_clink sc_rlink_wait
struct m0_tl pc_abandoned_svc_ctxs
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
M0_INTERNAL void m0_rpc_link_connect_async(struct m0_rpc_link *rlink, m0_time_t abs_timeout, struct m0_clink *wait_clink)
#define M0_MAGIX_OFFSET(type, field)
static void reqh_service_ctx_sm_lock(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_reqh_service_types_fini(void)
struct m0_clink sc_process_event
static int reqh_service_ctx_state_wait(struct m0_reqh_service_ctx *ctx, int state)
#define M0_FI_ENABLED(tag)
const struct m0_reqh_service_type * rs_type
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
M0_BOB_DEFINE(static, &rstypes_bob, m0_reqh_service_type)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
void(* rso_prepare_to_stop)(struct m0_reqh_service *service)
struct m0_sm_group rh_sm_grp
m0_time_t m0_time_from_now(uint64_t secs, long ns)
const struct m0_sm_conf service_states_conf
M0_INTERNAL void m0_reqh_service_list_print(void)
static struct m0_net_test_service svc
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
struct m0_pools_common * rh_pools
struct m0_sm_ast sc_rlink_ast
M0_INTERNAL void m0_rwlock_read_lock(struct m0_rwlock *lock)
struct m0_clink sc_svc_event
M0_INTERNAL void m0_rwlock_fini(struct m0_rwlock *lock)
M0_INTERNAL void m0_reqh_idle_wait_for(struct m0_reqh *reqh, struct m0_reqh_service *service)
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
static void reqh_service_ctx_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_confc_close(struct m0_conf_obj *obj)
static struct m0_bob_type rstypes_bob
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_rpc_session rlk_sess
M0_INTERNAL void m0_reqh_service_started(struct m0_reqh_service *service)
M0_INTERNAL void m0_rwlock_read_unlock(struct m0_rwlock *lock)
const struct m0_conf_obj_type M0_CONF_PROCESS_TYPE
M0_INTERNAL void m0_reqh_service_disconnect(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
#define m0_tl_find(name, var, head,...)
M0_INTERNAL void m0_reqh_service_ctx_fini(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_rpc_link_disconnect_async(struct m0_rpc_link *rlink, m0_time_t abs_timeout, struct m0_clink *wait_clink)
#define m0_tl_for(name, head, obj)
static struct m0_addb2_source * s
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
M0_INTERNAL void m0_bob_type_tlist_init(struct m0_bob_type *bt, const struct m0_tl_descr *td)
static bool service_type_is_valid(enum m0_conf_service_type t)
M0_INTERNAL void m0_reqh_service_ctxs_shutdown_prepare(struct m0_reqh *reqh)
int(* rso_start_async)(struct m0_reqh_service_start_async_ctx *asc)
static struct m0_rwlock rstypes_rwlock
#define m0_tl_exists(name, var, head,...)
const m0_time_t M0_TIME_IMMEDIATELY
static void reqh_service_ctx_destroy_if_abandoned(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_reqh_service_failed(struct m0_reqh_service *service)
const struct m0_reqh_service_ops * rs_ops
static void reqh_service_session_cancel(struct m0_reqh_service_ctx *ctx)
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL struct m0_reqh_service_ctx * m0_reqh_service_ctx_from_session(struct m0_rpc_session *session)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)