23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CM 405 struct m0_cm_cp, c_cm_proxy_linkage, c_magix,
502 .sd_name =
"Write-pre",
508 .sd_name =
"TX Open",
519 .sd_name =
"TX Done",
524 .sd_name =
"IO Wait",
537 .sd_name =
"Sliding window check",
548 .sd_name =
"Send Wait",
553 .sd_name =
"Recv Init",
558 .sd_name =
"Recv Wait",
564 .sd_name =
"Failure",
591 return m0_cm_cp_bob_check(cp) &&
ops !=
NULL &&
599 m0_cm_cp_bob_init(cp);
604 proxy_cp_tlink_init(cp);
646 if (!rpcbulk_tlist_is_empty(&rbulk->
rb_buflist)) {
649 size_t non_queued_buf_nr;
653 buf_nr = rpcbulk_tlist_length(&rbulk->
rb_buflist);
660 (
unsigned long long)buf_nr,
661 (
unsigned long long)non_queued_buf_nr);
666 if (buf_nr > non_queued_buf_nr) {
678 proxy_cp_tlink_fini(cp);
679 m0_cm_cp_bob_fini(cp);
697 return M0_ERR(-ESHUTDOWN);
708 cp_data_buf_tlink_init(nb);
709 cp_data_buf_tlist_add_tail(&cp->
c_buffers, nb);
720 nbuf_head = cp_data_buf_tlist_head(&cp->
c_buffers);
721 if (nbuf_head !=
NULL) {
728 cp_data_buf_tlink_del_fini(nbuf);
744 for (
i = 0;
i < bm->
b_nr; ++
i) {
758 nbuf_head = cp_data_buf_tlist_head(&cp->
c_buffers);
760 while ((nbuf = cp_data_buf_tlist_next(&cp->
c_buffers, nbuf)) !=
NULL) {
776 nbuf_head = cp_data_buf_tlist_head(&cp->
c_buffers);
779 if (new_v_count ==
NULL)
783 for (
i = 0;
i < new_v_nr; ++
i)
799 cp_data_buf_tlink_del_fini(nbuf);
810 cm =
src->c_ag->cag_cm;
819 src->c_xform_cp_indices.b_nr);
821 if (
src->c_xform_cp_indices.b_nr > 0)
835 uint64_t total_data_seg_nr;
837 M0_PRE(!cp_data_buf_tlist_is_empty(&
src->c_buffers));
838 M0_PRE(!cp_data_buf_tlist_is_empty(&
dst->c_buffers));
841 total_data_seg_nr =
src->c_data_seg_nr;
842 for (src_nbuf = cp_data_buf_tlist_head(&
src->c_buffers),
843 dst_nbuf = cp_data_buf_tlist_head(&
dst->c_buffers);
844 src_nbuf !=
NULL && dst_nbuf !=
NULL;
845 src_nbuf = cp_data_buf_tlist_next(&
src->c_buffers, src_nbuf),
846 dst_nbuf = cp_data_buf_tlist_next(&
dst->c_buffers, dst_nbuf)) {
M0_INTERNAL m0_bcount_t m0_bufvec_copy(struct m0_bufvec *dst, struct m0_bufvec *src, m0_bcount_t num_bytes)
M0_INTERNAL void m0_cm_cp_buf_release(struct m0_cm_cp *cp)
bool(* co_invariant)(const struct m0_cm_cp *cp)
M0_INTERNAL void m0_cm_cp_only_fini(struct m0_cm_cp *cp)
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
#define M0_ALLOC_ARR(arr, nr)
const struct m0_cm_type * cm_type
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
struct m0_net_buffer_pool * nb_pool
M0_INTERNAL int m0_cm_cp_fom_create(struct m0_fop *fop, struct m0_fop *r_fop, struct m0_fom **m, struct m0_reqh *reqh)
struct m0_chan c_reply_wait
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
static struct m0_addb2_mach * m
struct m0_bitmap c_xform_cp_indices
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
struct m0_bufvec nb_buffer
const struct m0_cm_ops * cm_ops
#define M0_LOG(level,...)
static int cp_fom_tick(struct m0_fom *fom)
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
struct m0_mutex c_reply_wait_mutex
M0_INTERNAL int m0_bufvec_merge(struct m0_bufvec *dst_bufvec, struct m0_bufvec *src_bufvec)
const struct m0_net_buffer_pool_ops * nbp_ops
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
static const struct m0_sm_conf m0_cm_cp_sm_conf
M0_INTERNAL void m0_cm_cp_fom_init(struct m0_cm *cm, struct m0_cm_cp *cp, struct m0_fop *fop, struct m0_fop *r_fop)
M0_INTERNAL int m0_cm_cp_bufvec_split(struct m0_cm_cp *cp)
#define container_of(ptr, type, member)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
M0_INTERNAL void m0_cm_cp_only_init(struct m0_cm *cm, struct m0_cm_cp *cp)
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
struct m0_fom_type ft_fom_type
M0_INTERNAL void m0_cm_cp_fom_fini(struct m0_fom *fom)
M0_INTERNAL int m0_cm_cp_dup(struct m0_cm_cp *src, struct m0_cm_cp **dest)
static struct m0_net_buffer_pool nbp
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
M0_INTERNAL void m0_cm_cp_data_copy(struct m0_cm_cp *src, struct m0_cm_cp *dst)
struct m0_fop_type * f_type
struct m0_fom_type ct_fomt
return M0_ERR(-EOPNOTSUPP)
struct m0_rpc_bulk c_bulk
static uint64_t cp_fom_locality(const struct m0_fom *fom)
void m0_fom_fini(struct m0_fom *fom)
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
M0_INTERNAL void m0_cm_cp_buf_move(struct m0_cm_cp *src, struct m0_cm_cp *dest)
static const struct m0_bob_type cp_bob
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
#define bob_of(ptr, type, field, bt)
M0_INTERNAL void m0_cm_cp_fini(struct m0_cm_cp *cp)
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
const struct m0_cm_cp_ops * c_ops
void(* nbpo_not_empty)(struct m0_net_buffer_pool *)
M0_INTERNAL void m0_cm_cp_buf_add(struct m0_cm_cp *cp, struct m0_net_buffer *nb)
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
int(* co_action[])(struct m0_cm_cp *cp)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
struct m0_cm_aggr_group * c_ag
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_cm_cp_bufvec_merge(struct m0_cm_cp *cp)
M0_INTERNAL void m0_cm_buffer_put(struct m0_net_buffer_pool *bp, struct m0_net_buffer *buf, uint64_t colour)
M0_INTERNAL void m0_bitmap_copy(struct m0_bitmap *dst, const struct m0_bitmap *src)
M0_INTERNAL int m0_reqh_state_get(struct m0_reqh *reqh)
struct m0_cm_cp *(* cmo_cp_alloc)(struct m0_cm *cm)
M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
M0_INTERNAL int m0_cm_cp_enqueue(struct m0_cm *cm, struct m0_cm_cp *cp)
#define M0_MAGIX_OFFSET(type, field)
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
M0_BOB_DEFINE(static, &cp_bob, m0_cm_cp)
static const struct m0_fom_ops cp_fom_ops
struct m0_reqh_service cm_service
M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
M0_TL_DESCR_DEFINE(cp_data_buf, "copy packet data buffers", M0_INTERNAL, struct m0_net_buffer, nb_extern_linkage, nb_magic, M0_NET_BUFFER_LINK_MAGIC, CM_CP_DATA_BUF_HEAD_MAGIX)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
static struct m0_fop * fop
struct m0_reqh_service_type ct_stype
void(* co_free)(struct m0_cm_cp *cp)
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
uint64_t(* co_home_loc_helper)(const struct m0_cm_cp *cp)
M0_INTERNAL void m0_cm_cp_init(struct m0_cm_type *cmtype, const struct m0_fom_type_ops *ft_ops)
void(* fo_fini)(struct m0_fom *fom)
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
M0_INTERNAL uint64_t m0_cm_cp_nr(struct m0_cm_cp *cp)
#define m0_tl_for(name, head, obj)
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
const struct m0_reqh_service_type * ft_rstype
M0_INTERNAL void m0_cm_ag_cp_add_locked(struct m0_cm_aggr_group *ag, struct m0_cm_cp *cp)
M0_INTERNAL bool m0_cm_cp_invariant(const struct m0_cm_cp *cp)
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
struct m0_pdclust_src_addr src
M0_TL_DEFINE(cp_data_buf, M0_INTERNAL, struct m0_net_buffer)
static struct m0_sm_state_descr m0_cm_cp_state_descr[]