29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 62 uint64_t max_packets_in_flight);
134 M0_ENTRY(
"machine: %p, net_dom: %p, ep_addr: %s, " 164 colour, msg_size, queue_len);
191 m0_rpc_machine_bob_init(
machine);
194 m0_reqh_rpc_mach_tlink_init_at_tail(
machine,
203 m0_reqh_rpc_mach_tlink_del_fini(
machine);
212 m0_rpc_machine_bob_fini(
machine);
258 rmach_watch_tlink_del_fini(watch);
390 M0_LEAVE(
"RPC worker thread STOPPED");
405 uint32_t max_per_source)
421 while (sent < max_per_source &&
463 uint32_t min_recv_size;
465 M0_ENTRY(
"tm: %p, net_dom: %p, ep_addr: %s", tm, net_dom,
511 rc =
rc ?: -ENETUNREACH;
623 rpc_conn_tlist_add(tlist,
conn);
637 uint64_t max_packets_in_flight)
642 (
unsigned long long)max_packets_in_flight);
676 if (
chan->rc_destep == dest_ep) {
691 uint64_t max_packets_in_flight)
710 ch->rc_destep = dest_ep;
758 rpc_chan_tlist_del(
chan);
795 M0_ENTRY(
"net_buf: %p, offset: %llu, length: %llu," 796 "ep_addr: %s", nb, (
unsigned long long)
offset,
797 (
unsigned long long)length, (
char *)from_ep->
nep_addr);
840 const struct m0_tl *conn_list;
878 " from ep_addr: %s, oneway = %d",
991 rmach_watch_tlink_init_at_tail(watch, &rmach->
rm_watch);
1009 if (rmach_watch_tlink_is_in(watch))
1010 rmach_watch_tlink_del_fini(watch);
1016 #undef M0_TRACE_SUBSYSTEM M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
M0_INTERNAL void rpc_chan_put(struct m0_rpc_chan *chan)
M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
struct m0_rpc_machine * mw_mach
static struct m0_addb2_philter p
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
static int __rpc_machine_init(struct m0_rpc_machine *machine)
struct m0_net_transfer_mc * nb_tm
void m0_rpc_machine_fini(struct m0_rpc_machine *machine)
struct m0_chan rm_nb_idle
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
struct m0_net_buffer_pool * nb_pool
uint64_t fc_max_nr_packets_enqed
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
struct m0_rpc_conn * h_conn
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
static void item_received(struct m0_rpc_item *item, struct m0_net_end_point *from_ep)
void(* mw_conn_added)(struct m0_rpc_machine_watch *w, struct m0_rpc_conn *conn)
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
struct m0_bufvec nb_buffer
M0_INTERNAL uint32_t m0_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
int m0_thread_join(struct m0_thread *q)
static const char * ep_addr
const struct m0_net_xprt m0_net_lnet_xprt
M0_INTERNAL int m0_rpc_packet_decode(struct m0_rpc_packet *p, struct m0_bufvec *bufvec, m0_bindex_t off, m0_bcount_t len)
M0_INTERNAL struct m0_rpc_conn * m0_rpc_machine_find_conn(const struct m0_rpc_machine *machine, const struct m0_rpc_item *item)
void m0_rpc_item_put(struct m0_rpc_item *item)
#define M0_LOG(level,...)
static void machine_nb_idle(struct m0_rpc_machine *machine)
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
struct m0_tl rm_outgoing_conns
enum m0_net_tm_state ntm_state
void m0_rpc_item_get(struct m0_rpc_item *item)
#define M0_ADDB2_PUSH(id,...)
struct m0_net_domain * ntm_dom
#define m0_exists(var, nr,...)
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
M0_INTERNAL int m0_rpc_rcv_conn_terminate(struct m0_rpc_conn *conn)
#define container_of(ptr, type, member)
static struct m0_net_tm_callbacks m0_rpc_tm_callbacks
struct m0_net_buffer * nbe_buffer
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
static struct m0_rpc_item * item
struct m0_tl c_item_sources
M0_INTERNAL int m0_net_tm_pool_attach(struct m0_net_transfer_mc *tm, struct m0_net_buffer_pool *bufpool, const struct m0_net_buffer_callbacks *callbacks, m0_bcount_t min_recv_size, uint32_t max_recv_msgs, uint32_t min_recv_queue_len)
struct m0_net_end_point * nbe_ep
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
static void rpc_tm_cleanup(struct m0_rpc_machine *machine)
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
struct m0_sm_group rm_sm_grp
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
M0_INTERNAL int m0_pagesize_get(void)
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
M0_INTERNAL void m0_rpc_packet_init(struct m0_rpc_packet *p, struct m0_rpc_machine *rmach)
static int rpc_tm_setup(struct m0_net_transfer_mc *tm, struct m0_net_domain *net_dom, const char *ep_addr, struct m0_net_buffer_pool *pool, uint32_t colour, m0_bcount_t msg_size, uint32_t qlen)
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
void m0_rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm, struct m0_rpc_frm_constraints *constraints, const struct m0_rpc_frm_ops *ops)
M0_INTERNAL void m0_rpc_packet_fini(struct m0_rpc_packet *p)
struct m0_rpc_machine * c_rpc_machine
bool m0_time_is_in_past(m0_time_t t)
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
#define M0_ERR_INFO(rc, fmt,...)
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
return M0_ERR(-EOPNOTSUPP)
m0_bcount_t rm_bulk_cutoff
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
m0_bcount_t fc_max_packet_size
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
struct m0_net_transfer_mc rm_tm
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
uint64_t rs_nr_rcvd_bytes
static void packet_received(struct m0_rpc_packet *p, struct m0_rpc_machine *machine, struct m0_net_end_point *from_ep)
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
M0_INTERNAL void m0_rpc_fop_conn_establish_ctx_init(struct m0_rpc_item *item, struct m0_net_end_point *ep)
const struct m0_net_xprt * nd_xprt
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
M0_TL_DESCR_DEFINE(rpc_chan, "rpc_channels", static, struct m0_rpc_chan, rc_linkage, rc_magic, M0_RPC_CHAN_MAGIC, M0_RPC_CHAN_HEAD_MAGIC)
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
m0_time_t m0_time_now(void)
M0_INTERNAL m0_bcount_t m0_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
struct m0_rpc_item_header2 ri_header
uint64_t fc_max_nr_segments
uint32_t rm_min_recv_size
struct m0_tl rh_rpc_machines
void m0_thread_fini(struct m0_thread *q)
static void rpc_chan_ref_release(struct m0_ref *ref)
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
M0_INTERNAL int m0_rpc_machine_init(struct m0_rpc_machine *machine, struct m0_net_domain *net_dom, const char *ep_addr, struct m0_reqh *reqh, struct m0_net_buffer_pool *receive_pool, uint32_t colour, m0_bcount_t msg_size, uint32_t queue_len)
uint64_t rs_nr_dropped_items
const struct m0_rpc_item_type * ri_type
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
void m0_rpc_machine_watch_attach(struct m0_rpc_machine_watch *watch)
static void net_buf_received(struct m0_net_buffer *nb, m0_bindex_t offset, m0_bcount_t length, struct m0_net_end_point *from_ep)
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
M0_INTERNAL bool m0_rpc_item_is_conn_establish(const struct m0_rpc_item *item)
static m0_bindex_t offset
struct m0_net_transfer_mc * nep_tm
void m0_rpc_machine_watch_detach(struct m0_rpc_machine_watch *watch)
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_tl rm_incoming_conns
struct m0_rpc_machine machine
static void __rpc_machine_fini(struct m0_rpc_machine *machine)
M0_INTERNAL void m0_net_tm_colour_set(struct m0_net_transfer_mc *tm, uint32_t colour)
void m0_addb2_pop(uint64_t id)
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
struct m0_thread rm_worker
M0_INTERNAL void m0_rpc_conn_cleanup_all_sessions(struct m0_rpc_conn *conn)
static struct m0_pool pool
M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL void m0_rpc_conn_terminate_reply_sent(struct m0_rpc_conn *conn)
void m0_net_end_point_put(struct m0_net_end_point *ep)
void(* mw_mach_terminated)(struct m0_rpc_machine_watch *w)
M0_INTERNAL bool m0_rpc_machine_is_not_locked(const struct m0_rpc_machine *machine)
uint32_t m0_rpc__filter_opcode[4]
#define M0_MAGIX_OFFSET(type, field)
static bool rpc_conn__on_finalised_cb(struct m0_clink *clink)
static struct m0_chan chan[RDWR_REQUEST_MAX]
#define M0_FI_ENABLED(tag)
struct m0_net_end_point * ntm_ep
m0_bcount_t fc_max_nr_bytes_accumulated
static struct m0_rpc_chan * rpc_chan_locate(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep)
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
M0_INTERNAL void m0_rpc_machine_drain_item_sources(struct m0_rpc_machine *machine, uint32_t max_per_source)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
M0_BOB_DEFINE(, &rpc_machine_bob_type, m0_rpc_machine)
static void net_buf_err(struct m0_net_buffer *nb, int32_t status)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
M0_INTERNAL void m0_rpc_packet_remove_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
#define for_each_item_in_packet(item, packet)
M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
M0_INTERNAL void m0_net_buffer_pool_put(struct m0_net_buffer_pool *pool, struct m0_net_buffer *buf, uint32_t colour)
#define M0_MKTIME(secs, ns)
M0_INTERNAL struct m0_rpc_chan * rpc_chan_get(struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_packets_in_flight)
M0_INTERNAL void m0_rpc_machine_add_conn(struct m0_rpc_machine *rmach, struct m0_rpc_conn *conn)
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
static struct m0_rpc_machine * tm_to_rpc_machine(const struct m0_net_transfer_mc *tm)
M0_INTERNAL void rpc_worker_thread_fn(struct m0_rpc_machine *machine)
static void buf_recv_cb(const struct m0_net_buffer_event *ev)
struct m0_rpc_stats rm_stats
M0_INTERNAL void m0_rpc_oneway_item_post_locked(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
static void rpc_tm_event_cb(const struct m0_net_tm_event *ev)
struct m0_rpc_machine * ri_rmachine
M0_INTERNAL void m0_rpc_machine_cleanup_incoming_connections(struct m0_rpc_machine *machine)
static void __rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
M0_INTERNAL void m0_sm_asts_run(struct m0_sm_group *grp)
static void rpc_recv_pool_buffer_put(struct m0_net_buffer *nb)
#define m0_tl_find(name, var, head,...)
#define m0_tl_for(name, head, obj)
M0_INTERNAL int m0_rpc_item_received(struct m0_rpc_item *item, struct m0_rpc_machine *machine)
static int rpc_chan_create(struct m0_rpc_chan **chan, struct m0_rpc_machine *machine, struct m0_net_end_point *dest_ep, uint64_t max_packets_in_flight)
static const struct m0_net_buffer_callbacks rpc_buf_recv_cb
static uint64_t m0_align(uint64_t val, uint64_t alignment)
const struct m0_net_tm_callbacks * ntm_callbacks
const struct m0_rpc_frm_ops m0_rpc_frm_default_ops
uint64_t rs_nr_rcvd_packets
M0_INTERNAL void(* m0_rpc__item_dropped)(struct m0_rpc_item *item)
#define end_for_each_item_in_packet
static bool item_received_fi(struct m0_rpc_item *item)
M0_TL_DEFINE(rpc_chan, static, struct m0_rpc_chan)
static int conn_state(const struct m0_rpc_conn *conn)
struct m0_net_end_point * nb_ep
static const struct m0_bob_type rpc_machine_bob_type
struct m0_net_buffer_pool * ntm_recv_pool