23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 39 #include "rpc/rpc_opcodes_xc.h" 107 #define HEADER1_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_header1_xc, ptr) 108 #define HEADER2_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_header2_xc, ptr) 109 #define FOOTER_XCODE_OBJ(ptr) M0_XCODE_OBJ(m0_rpc_item_footer_xc, ptr) 145 rit_tlink_del_fini(item_type);
157 M0_ENTRY(
"item_type: %p, item_opcode: %s (%u)", item_type,
178 M0_ENTRY(
"item_type: %p", item_type);
182 rit_tlink_del_fini(item_type);
201 M0_LEAVE(
"item_type: %p", item_type);
208 .sd_name =
"UNINITIALISED",
213 .sd_name =
"INITIALISED",
221 .sd_name =
"ENQUEUED",
235 .sd_name =
"SENDING",
248 .sd_name =
"WAITING_FOR_REPLY",
256 .sd_name =
"REPLIED",
279 .sd_name =
"UNINITIALISED",
284 .sd_name =
"INITIALISED",
291 .sd_name =
"ACCEPTED",
298 .sd_name =
"REPLIED",
332 (
req + rply + oneway == 1) &&
380 packet_item_tlink_init(
item);
381 itemq_tlink_init(
item);
382 rpcitem_tlink_init(
item);
384 ric_tlink_init(
item);
385 pending_item_tlink_init(
item);
386 xidl_tlink_init(
item);
413 if (itemq_tlink_is_in(
item))
423 ric_tlink_fini(
item);
424 itemq_tlink_fini(
item);
425 packet_item_tlink_fini(
item);
426 rpcitem_tlink_fini(
item);
428 pending_item_tlink_fini(
item);
429 xidl_tlink_fini(
item);
548 "osr_session_xid_min=%"PRIu64,
560 "osr_session_xid_min=%"PRIu64,
599 "osr_session_xid_min=%" PRIu64 " ri_xid_assigned_here=%d",
605 s_conn->c_rpc_machine));
606 xidl_tlist_del(
item);
636 if ((xid & 0xff) == 0)
647 "RPC items had been cancelled on the other side." 648 " Changing session %p xid from %" PRIu64 " to %" PRIu64 ".",
661 if (xid == sess->
s_xid + 1) {
685 if (cached !=
NULL) {
769 "state=\"%s\" rpc_machine_ep=%s " 772 (rpc_mc ==
NULL ) ?
"NOT_AVAILABLE" :
888 if (!pending_item_tlink_is_in(
item)) {
913 if (packet_item_tlink_is_in(
item)) {
1108 "session state %d does not allow sending",
1187 M0_INTERNAL
const char *
1201 M0_INTERNAL
const char *
1215 uint64_t item_sm_id;
1247 return M0_RC(-ENOENT);
1250 return M0_RC(-ENOENT);
1293 if (
rc == -ESHUTDOWN)
1327 return M0_RC(-EPROTO);
1333 return M0_RC(-EREMOTE);
1366 if (
req->ri_error == -ETIMEDOUT) {
1373 }
else if (
req->ri_error != 0) {
1383 switch (
req->ri_sm.sm_state) {
1420 req->ri_rmachine->rm_stats.rs_nr_dropped_items++;
1443 if (svc_obj ==
NULL)
1522 sess =
req->ri_session;
1541 if (
reply->ri_error != 0)
1594 if (cached ==
NULL) {
1611 ric_tlink_del_fini(
item);
1751 pending_item_tlink_del_fini(
item);
1760 (uint64_t)
req,
req->ri_type->rit_opcode);
1761 req->ri_ops->rio_replied(
req);
1772 #undef M0_TRACE_SUBSYSTEM
static void item_resend(struct m0_rpc_item *item)
static void sm_state(const struct m0_sm_conf *conf, struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
M0_INTERNAL void m0_rpc_item_xid_list_fini(struct m0_rpc_session *session)
static struct m0_mutex lock
m0_time_t ri_resend_interval
M0_INTERNAL bool m0_rpc_conn_is_known_dead(const struct m0_rpc_conn *conn)
static struct m0_be_active_record_domain dummy
#define M0_ALLOC_ARR(arr, nr)
const struct m0_sm_conf incoming_item_sm_conf
enum m0_rpc_item_priority ri_prio
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
M0_INTERNAL void m0_rpc_item_xid_list_init(struct m0_rpc_session *session)
struct m0_sm_timer ri_timer
M0_INTERNAL int m0_rpc_item_timer_start(struct m0_rpc_item *item)
const m0_time_t M0_TIME_NEVER
const struct m0_rpc_item_type_ops * rit_ops
static int item_conn_test(struct m0_rpc_item *item)
static struct io_request req
M0_INTERNAL struct m0_rpc_conn * m0_rpc_machine_find_conn(const struct m0_rpc_machine *machine, const struct m0_rpc_item *item)
M0_INTERNAL void m0_sm_timeout_init(struct m0_sm_timeout *to)
M0_INTERNAL void m0_rpc_item_replied_invoke(struct m0_rpc_item *req)
void m0_rpc_item_put(struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_item_module_init(void)
#define M0_LOG(level,...)
M0_INTERNAL const uint64_t M0_HA_EPOCH_NONE
uint64_t rs_nr_resend_attempts
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
static bool opcode_is_dup(uint32_t opcode)
void m0_rpc_item_get(struct m0_rpc_item *item)
struct m0_mutex * ric_lock
#define M0_ADDB2_PUSH(id,...)
static bool m0_is_po2(uint64_t val)
struct m0_tl ri_compound_items
static void leave(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
M0_INTERNAL void m0_rwlock_write_lock(struct m0_rwlock *lock)
M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
static void rpc_item_xid_unassign(struct m0_rpc_item *item)
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
m0_time_t ri_cache_deadline
M0_INTERNAL struct m0_rpc_item_type * m0_rpc_item_type_lookup(uint32_t opcode)
static struct m0_addb2_mach * mach
struct m0_sm_conf rit_incoming_conf
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
#define container_of(ptr, type, member)
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL void m0_rpc_item_type_register(struct m0_rpc_item_type *item_type)
struct m0_sm_timeout ri_deadline_timeout
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
m0_time_t m0_time(uint64_t secs, long ns)
void m0_rpc_item_init(struct m0_rpc_item *item, const struct m0_rpc_item_type *itype)
static struct m0_rpc_item * item
M0_INTERNAL void m0_rpc_item_sm_fini(struct m0_rpc_item *item)
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
struct m0_rpc_chan * c_rpcchan
static int item_entered_in_urgent_state(struct m0_sm *mach)
static void item__on_reply_postprocess(struct m0_rpc_item *item)
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_footer_size
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_pending_cache_add(struct m0_rpc_item *item)
static struct m0_rpc_session * session
static void addb2_add_rpc_attrs(const struct m0_rpc_item *req)
M0_INTERNAL void m0_rwlock_init(struct m0_rwlock *lock)
struct m0_sm_group rm_sm_grp
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
static struct m0_sm_state_descr incoming_item_states[]
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
M0_INTERNAL int m0_xcode_length(struct m0_xcode_ctx *ctx)
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_item_send_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
M0_INTERNAL struct m0_rpc_session * m0_rpc_session_search(const struct m0_rpc_conn *conn, uint64_t session_id)
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
M0_TL_DEFINE(rpcitem, M0_INTERNAL, struct m0_rpc_item)
void m0_rpc_item_fini(struct m0_rpc_item *item)
struct m0_tl s_pending_cache
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_payload_size(const struct m0_rpc_session *session)
struct m0_rpc_machine * c_rpc_machine
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
uint64_t rs_nr_failed_items
return M0_ERR(-EOPNOTSUPP)
bool ri_xid_assigned_here
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
M0_INTERNAL void m0_rpc_item_timer_stop(struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_conn_ha_timer_start(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_item_process_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
m0_bcount_t(* rito_payload_size)(const struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_module_fini(void)
uint64_t rs_nr_rcvd_items
void(* rito_item_get)(struct m0_rpc_item *item)
M0_INTERNAL const char * m0_rpc_item_type_name(const struct m0_rpc_item_type *item_type)
void m0_rpc_item_cancel(struct m0_rpc_item *item)
#define M0_AMB(obj, ptr, field)
M0_INTERNAL void m0_rpc_item_pending_cache_del(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_frm_item_deadline_passed(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
#define m0_tl_teardown(name, head, obj)
M0_INTERNAL void m0_rpc_session_hold_busy(struct m0_rpc_session *session)
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
M0_INTERNAL bool m0_rpc_item_xid_check(struct m0_rpc_item *item, struct m0_rpc_item **next)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
struct m0_sm_conf rit_outgoing_conf
M0_INTERNAL void m0_rpc_conn_ha_timer_stop(struct m0_rpc_conn *conn)
M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
static void item_cancel_fi(struct m0_rpc_item *item)
static void pending_cache_drain(struct m0_rpc_session *session)
m0_time_t m0_time_now(void)
struct m0_rpc_item_header2 ri_header
void m0_sm_state_set(struct m0_sm *mach, int state)
enum m0_ha_obj_state co_ha_state
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
M0_INTERNAL void m0_rpc_item_cache_purge(struct m0_rpc_item_cache *ic)
M0_INTERNAL void m0_rpc_item_cache_del(struct m0_rpc_item_cache *ic, uint64_t xid)
struct m0_rpc_frm * ri_frm
M0_INTERNAL void m0_rpc_session_item_failed(struct m0_rpc_item *item)
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
M0_TL_DESCR_DEFINE(rpcitem, "rpc item tlist", M0_INTERNAL, struct m0_rpc_item, ri_field, ri_magic, M0_RPC_ITEM_MAGIC, M0_RPC_ITEM_HEAD_MAGIC)
const struct m0_rpc_item_type * ri_type
struct m0_rpc_item * ri_reply
static struct m0_sm_state_descr outgoing_item_states[]
M0_INTERNAL void m0_rpc_frm_remove_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
#define m0_cookie_of(cookie, type, field)
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
M0_INTERNAL bool m0_rpc_item_is_update(const struct m0_rpc_item *item)
static void item_timedout(struct m0_rpc_item *item)
M0_INTERNAL void m0_rwlock_write_unlock(struct m0_rwlock *lock)
M0_INTERNAL void m0_rpc_item_type_deregister(struct m0_rpc_item_type *item_type)
void(* rio_sent)(struct m0_rpc_item *item)
void(* rito_item_put)(struct m0_rpc_item *item)
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
struct m0_rpc_machine machine
M0_INTERNAL void m0_rpc_item_pending_cache_fini(struct m0_rpc_session *session)
const char * m0_xcode_enum_print(const struct m0_xcode_enum *en, uint64_t val, char *buf)
static struct m0_rwlock rpc_item_types_lock
void m0_addb2_pop(uint64_t id)
#define M0_MOTR_IEM_DESC(_sev_id, _mod_id, _evt_id, _desc,...)
static struct m0_tl rpc_item_types_list
static bool item_reply_received_fi(struct m0_rpc_item *req, struct m0_rpc_item *reply)
M0_INTERNAL const char * m0_rpc_item_opname(const struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_pending_cache_init(struct m0_rpc_session *session)
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_cache_clear(struct m0_rpc_item_cache *ic)
struct m0_rpc_item_cache s_reply_cache
M0_INTERNAL int m0_rpc_session_validate(struct m0_rpc_session *session)
struct m0_rpc_packet * ri_packet
M0_INTERNAL bool m0_mod_gt(uint64_t x0, uint64_t x1)
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
static void item_timer_cb(struct m0_sm_timer *timer)
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
M0_INTERNAL void m0_rpc_item_xid_min_update(struct m0_rpc_item *item)
struct m0_rpc_conn_ha_ops rchc_ops
M0_INTERNAL void m0_sm_timeout_fini(struct m0_sm_timeout *to)
#define FOOTER_XCODE_OBJ(ptr)
void m0_rpc_item_put_lock(struct m0_rpc_item *item)
#define M0_FI_ENABLED(tag)
M0_INTERNAL void m0_rpc_item_sent_invoke(struct m0_rpc_item *item)
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
M0_INTERNAL bool m0_rpc_item_cache_add(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item, m0_time_t deadline)
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic, struct m0_mutex *lock)
const struct m0_rpc_item_ops * ri_ops
m0_time_t m0_time_from_now(uint64_t secs, long ns)
static int item_reply_received(struct m0_rpc_item *reply, struct m0_rpc_item **req_out)
uint64_t rs_nr_timedout_items
const struct m0_sm_conf outgoing_item_sm_conf
struct m0_rpc_session * ri_session
void m0_rpc_item_cancel_nolock(struct m0_rpc_item *item)
M0_INTERNAL bool item_is_in_waiting_queue(const struct m0_rpc_item *item, const struct m0_rpc_frm *frm)
m0_bcount_t m0_rpc_item_payload_size(struct m0_rpc_item *item)
M0_INTERNAL void m0_rwlock_read_lock(struct m0_rwlock *lock)
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
M0_INTERNAL bool m0_rpc_item_cache__invariant(struct m0_rpc_item_cache *ic)
M0_INTERNAL void m0_rpc_packet_remove_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
M0_INTERNAL void m0_rwlock_fini(struct m0_rwlock *lock)
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_item_dispatch(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_xid_assign(struct m0_rpc_item *item)
M0_INTERNAL struct m0_rpc_item * m0_rpc_item_cache_lookup(struct m0_rpc_item_cache *ic, uint64_t xid)
#define HEADER2_XCODE_OBJ(ptr)
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_rpc_item_cache s_req_cache
M0_INTERNAL void m0_rwlock_read_unlock(struct m0_rwlock *lock)
M0_INTERNAL void m0_xcode_ctx_init(struct m0_xcode_ctx *ctx, const struct m0_xcode_obj *obj)
static void rpc_item_cache_del(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item)
struct m0_rpc_stats rm_stats
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
static bool rpc_item_needs_xid(const struct m0_rpc_item *item)
struct m0_rpc_machine * ri_rmachine
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
static struct m0_dtm_oper_descr reply
#define m0_tl_find(name, var, head,...)
M0_INTERNAL bool m0_rpc_item_invariant(const struct m0_rpc_item *item)
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
#define m0_tl_for(name, head, obj)
#define HEADER1_XCODE_OBJ(ptr)
M0_INTERNAL int m0_rpc_item_received(struct m0_rpc_item *item, struct m0_rpc_machine *machine)
M0_INTERNAL const char * item_state_name(const struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_session_release(struct m0_rpc_session *session)
static int req_replied(struct m0_rpc_item *req, struct m0_rpc_item *reply)
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
static struct m0_sm_state_descr states[C_NR]
const struct m0_sm_conf * sm_conf
struct m0_rpc_conn * s_conn
static void conn_flag_unset(struct m0_rpc_conn *conn, uint64_t flag)
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
void m0_rpc_item_cancel_init(struct m0_rpc_item *item)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)