23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 45 M0_EXPORTED(rpcbulk_tl);
51 return rbuf !=
NULL &&
54 rpcbulk_tlink_is_in(rbuf);
64 buf->bb_rbulk == rbulk);
78 M0_ENTRY(
"bulk_buf: %p nb=%p", rbuf, nbuf);
99 M0_ENTRY(
"bulk_buf: %p, net_buf: %p", rbuf, nb);
105 return M0_ERR_INFO(
rc,
"bulk_buf: Zero vector initialization");
123 for (
i = 0;
i < segs_nr; ++
i) {
135 rpcbulk_tlink_init(rbuf);
152 rbulk =
buf->bb_rbulk;
155 "evt->nbe_status %d", rbulk, nb,
177 rpcbulk_tlist_del(
buf);
179 if (rpcbulk_tlist_is_empty(&rbulk->
rb_buflist)) {
193 size_t unqueued_nr = 0;
198 if (rbulk->
rb_rc != 0)
204 rpcbulk_tlist_del(rbuf);
210 M0_LEAVE(
"rbulk %p, unqueued_nr %llu", rbulk,
211 (
unsigned long long)unqueued_nr);
224 if (rbulk->
rb_rc != 0)
300 M0_ENTRY(
"rbulk: %p, net_dom: %p, net_buf: %p", rbulk, netdom, nb);
307 return M0_ERR_INFO(-EMSGSIZE,
"Cannot exceed net_max_buf_seg");
320 buf->bb_rbulk = rbulk;
353 return M0_RC(-EMSGSIZE);
377 M0_ENTRY(
"rpc_bulk: %p, qtype: %d", rbulk,
q);
426 M0_ENTRY(
"rbulk: %p, rpc_conn: %p, rbulk_op_type: %d", rbulk,
conn,
op);
433 tm = &rpcmach->
rm_tm;
508 rpcbulk_tlist_del(rbuf);
555 buf_nr = rpcbulk_tlist_length(&rbulk->
rb_buflist);
560 #undef M0_TRACE_SUBSYSTEM
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
M0_INTERNAL int m0_0vec_init(struct m0_0vec *zvec, uint32_t segs_nr)
struct m0_net_transfer_mc * nb_tm
static int rpc_bulk_buf_init(struct m0_rpc_bulk_buf *rbuf, uint32_t segs_nr, m0_bcount_t length, struct m0_net_buffer *nb)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
static struct m0_semaphore q
M0_INTERNAL void m0_0vec_fini(struct m0_0vec *zvec)
struct m0_rpc_bulk * bb_rbulk
M0_INTERNAL int m0_rpc_bulk_store(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *to_desc, const struct m0_net_buffer_callbacks *bulk_cb)
struct m0_bufvec nb_buffer
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
M0_INTERNAL bool m0_chan_has_waiters(struct m0_chan *chan)
M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf, void *buf, m0_bcount_t count, m0_bindex_t index, struct m0_net_domain *netdom)
#define M0_LOG(level,...)
static bool rpc_bulk_buf_invariant(const struct m0_rpc_bulk_buf *rbuf)
M0_TL_DESCR_DEFINE(rpcbulk, "rpc bulk buffer list", M0_INTERNAL, struct m0_rpc_bulk_buf, bb_link, bb_magic, M0_RPC_BULK_BUF_MAGIC, M0_RPC_BULK_MAGIC)
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
struct m0_net_domain * ntm_dom
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
struct m0_net_buffer * nbe_buffer
M0_TL_DEFINE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
#define M0_ASSERT_EX(cond)
static void bulk_cb(struct m0_net_test_network_ctx *ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
M0_INTERNAL uint64_t m0_dummy_id_generate(void)
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
struct m0_rpc_machine * c_rpc_machine
#define M0_ERR_INFO(rc, fmt,...)
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_bulk_default_cb(const struct m0_net_buffer_event *evt)
#define m0_tl_teardown(name, head, obj)
enum m0_net_queue_type nb_qtype
static int rpc_bulk_op(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *descs, enum m0_rpc_bulk_op_type op, const struct m0_net_buffer_callbacks *bulk_cb)
struct m0_net_buffer * bb_nbuf
struct m0_net_transfer_mc rm_tm
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
const struct m0_net_buffer_callbacks * nb_callbacks
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
struct m0_0vec bb_zerovec
static bool rpc_bulk_invariant(const struct m0_rpc_bulk *rbulk)
M0_INTERNAL int m0_0vec_cbuf_add(struct m0_0vec *zvec, const struct m0_buf *buf, const m0_bindex_t *index)
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_size(struct m0_net_domain *dom)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
static void rpc_bulk_buf_deregister(struct m0_rpc_bulk_buf *buf)
M0_INTERNAL size_t m0_rpc_bulk_buf_length(struct m0_rpc_bulk *rbulk)
struct m0_net_buf_desc bdd_desc
M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
#define M0_FI_ENABLED(tag)
static void rpc_bulk_buf_fini(struct m0_rpc_bulk_buf *rbuf)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
static void addb2_add_rpc_bulk_attr(struct m0_rpc_bulk *rbulk, enum m0_rpc_bulk_op_type op, uint32_t buf_nr, uint64_t seg_nr)
m0_time_t m0_time_from_now(uint64_t secs, long ns)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
struct m0_net_buf_desc bdd_desc
M0_INTERNAL void m0_rpc_bulk_qtype(struct m0_rpc_bulk *rbulk, enum m0_net_queue_type q)
struct m0_net_buf_desc nb_desc
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
#define m0_tl_for(name, head, obj)
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
#define m0_tl_forall(name, var, head,...)