Motr  M0
bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 #include "lib/tlist.h"
26 #include "lib/memory.h"
27 #include "lib/errno.h"
28 #include "lib/misc.h" /* M0_IN */
29 #include "lib/finject.h"
30 #include "motr/magic.h"
31 #include "net/net.h"
32 #include "rpc/bulk.h"
33 #include "rpc/addb2.h"
34 #include "rpc/rpc_internal.h"
35 
41 M0_TL_DESCR_DEFINE(rpcbulk, "rpc bulk buffer list", M0_INTERNAL,
42  struct m0_rpc_bulk_buf, bb_link, bb_magic,
44 
45 M0_EXPORTED(rpcbulk_tl);
46 
47 M0_TL_DEFINE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf);
48 
49 static bool rpc_bulk_buf_invariant(const struct m0_rpc_bulk_buf *rbuf)
50 {
51  return rbuf != NULL &&
53  rbuf->bb_rbulk != NULL &&
54  rpcbulk_tlink_is_in(rbuf);
55 }
56 
57 static bool rpc_bulk_invariant(const struct m0_rpc_bulk *rbulk)
58 {
59  return
60  rbulk != NULL && rbulk->rb_magic == M0_RPC_BULK_MAGIC &&
61  m0_mutex_is_locked(&rbulk->rb_mutex) &&
62  m0_tl_forall(rpcbulk, buf, &rbulk->rb_buflist,
64  buf->bb_rbulk == rbulk);
65 }
66 
68 {
69  if (buf->bb_flags & M0_RPC_BULK_NETBUF_REGISTERED) {
70  m0_net_buffer_deregister(buf->bb_nbuf, buf->bb_nbuf->nb_dom);
71  buf->bb_flags &= ~M0_RPC_BULK_NETBUF_REGISTERED;
72  }
73 }
74 static void rpc_bulk_buf_fini(struct m0_rpc_bulk_buf *rbuf)
75 {
76  struct m0_net_buffer *nbuf = rbuf->bb_nbuf;
77 
78  M0_ENTRY("bulk_buf: %p nb=%p", rbuf, nbuf);
79  M0_PRE(rbuf != NULL);
80  M0_PRE(!(nbuf->nb_flags & M0_NET_BUF_QUEUED));
81 
83  m0_net_desc_free(&nbuf->nb_desc);
84  m0_0vec_fini(&rbuf->bb_zerovec);
86  m0_free(nbuf);
87  m0_free(rbuf);
88  M0_LEAVE();
89 }
90 
91 static int rpc_bulk_buf_init(struct m0_rpc_bulk_buf *rbuf, uint32_t segs_nr,
92  m0_bcount_t length, struct m0_net_buffer *nb)
93 {
94  int rc;
95  uint32_t i;
96  struct m0_buf cbuf;
97  m0_bindex_t index = 0;
98 
99  M0_ENTRY("bulk_buf: %p, net_buf: %p", rbuf, nb);
100  M0_PRE(rbuf != NULL);
101  M0_PRE(segs_nr > 0);
102 
103  rc = m0_0vec_init(&rbuf->bb_zerovec, segs_nr);
104  if (rc != 0)
105  return M0_ERR_INFO(rc, "bulk_buf: Zero vector initialization");
106 
107  rbuf->bb_flags = 0;
108  if (nb == NULL) {
109  M0_ALLOC_PTR(rbuf->bb_nbuf);
110  if (rbuf->bb_nbuf == NULL) {
111  m0_0vec_fini(&rbuf->bb_zerovec);
112  return M0_ERR(-ENOMEM);
113  }
115  rbuf->bb_nbuf->nb_buffer = rbuf->bb_zerovec.z_bvec;
116  } else {
117  rbuf->bb_nbuf = nb;
118  /*
119  * Incoming buffer can be bigger while the bulk transfer
120  * request could refer to smaller size. Hence initialize
121  * the zero vector to get correct size of bulk transfer.
122  */
123  for (i = 0; i < segs_nr; ++i) {
124  cbuf.b_addr = nb->nb_buffer.ov_buf[i];
125  cbuf.b_nob = nb->nb_buffer.ov_vec.v_count[i];
126  rc = m0_0vec_cbuf_add(&rbuf->bb_zerovec, &cbuf, &index);
127  if (rc != 0) {
128  m0_0vec_fini(&rbuf->bb_zerovec);
129  return M0_ERR_INFO(rc, "Addition of cbuf");
130  }
131  }
132  }
133  if (length != 0)
134  rbuf->bb_nbuf->nb_length = length;
135  rpcbulk_tlink_init(rbuf);
137  return M0_RC(rc);
138 }
139 
140 M0_INTERNAL void m0_rpc_bulk_default_cb(const struct m0_net_buffer_event *evt)
141 {
142  struct m0_rpc_bulk *rbulk;
143  struct m0_rpc_bulk_buf *buf;
144  struct m0_net_buffer *nb;
145 
146  M0_ENTRY("net_buf_evt: %p", evt);
147  M0_PRE(evt != NULL);
148  M0_PRE(evt->nbe_buffer != NULL);
149 
150  nb = evt->nbe_buffer;
151  buf = (struct m0_rpc_bulk_buf *)nb->nb_app_private;
152  rbulk = buf->bb_rbulk;
153 
154  M0_LOG(M0_DEBUG, "rbulk %p, nbuf %p, nbuf->nb_qtype %lu, "
155  "evt->nbe_status %d", rbulk, nb,
156  (unsigned long)nb->nb_qtype, evt->nbe_status);
158  M0_ASSERT(rpcbulk_tlink_is_in(buf));
159 
160  if (M0_IN(nb->nb_qtype, (M0_NET_QT_PASSIVE_BULK_RECV,
162  nb->nb_length = evt->nbe_length;
163 
164  m0_mutex_lock(&rbulk->rb_mutex);
166  /*
167  * Change the status code of struct m0_rpc_bulk only if it is
168  * zero so far. This will ensure that return code of first failure
169  * from list of net buffers in struct m0_rpc_bulk will be maintained.
170  * Buffers are canceled by io coalescing code which in turn sends
171  * a coalesced buffer and cancels member buffers. Hence -ECANCELED
172  * is not treated as an error here.
173  */
174  if (rbulk->rb_rc == 0 && evt->nbe_status != -ECANCELED)
175  rbulk->rb_rc = evt->nbe_status;
176 
177  rpcbulk_tlist_del(buf);
179  if (rpcbulk_tlist_is_empty(&rbulk->rb_buflist)) {
182  if (m0_chan_has_waiters(&rbulk->rb_chan))
183  m0_chan_signal(&rbulk->rb_chan);
184  }
185  m0_mutex_unlock(&rbulk->rb_mutex);
186 
187  M0_LEAVE("rb_rc=%d", rbulk->rb_rc);
188 }
189 
190 M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
191 {
192  struct m0_rpc_bulk_buf *rbuf;
193  size_t unqueued_nr = 0;
194 
195  M0_ENTRY("rbulk %p", rbulk);
196  M0_PRE(rbulk != NULL);
197 
198  if (rbulk->rb_rc != 0)
199  M0_LOG(M0_ERROR, "rbulk:%p rc:%d", rbulk, rbulk->rb_rc);
202  m0_tl_for (rpcbulk, &rbulk->rb_buflist, rbuf) {
203  if (!(rbuf->bb_flags & M0_RPC_BULK_NETBUF_QUEUED)) {
204  rpcbulk_tlist_del(rbuf);
205  rpc_bulk_buf_fini(rbuf);
206  ++unqueued_nr;
207  }
208  } m0_tl_endfor;
209 
210  M0_LEAVE("rbulk %p, unqueued_nr %llu", rbulk,
211  (unsigned long long)unqueued_nr);
212  return unqueued_nr;
213 }
214 
215 M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
216 {
217  struct m0_rpc_bulk_buf *rbuf;
218 
219  M0_ENTRY("rbulk %p", rbulk);
220  M0_PRE(rbulk != NULL);
221 
222  m0_mutex_lock(&rbulk->rb_mutex);
223 
224  if (rbulk->rb_rc != 0)
225  M0_LOG(M0_ERROR, "rbulk:%p rc:%d", rbulk, rbulk->rb_rc);
228 
229  m0_tl_for (rpcbulk, &rbulk->rb_buflist, rbuf) {
230  m0_net_buffer_del(rbuf->bb_nbuf, rbuf->bb_nbuf->nb_tm);
231  } m0_tl_endfor;
232 
233  m0_mutex_unlock(&rbulk->rb_mutex);
234 
235  M0_LEAVE("rbulk %p", rbulk);
236 }
237 
239  .nbc_cb = {
244  }
245 };
246 
247 M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
248 {
249  M0_ENTRY("rbulk: %p", rbulk);
250  M0_PRE(rbulk != NULL);
251 
252  rpcbulk_tlist_init(&rbulk->rb_buflist);
253  m0_mutex_init(&rbulk->rb_mutex);
254  m0_chan_init(&rbulk->rb_chan, &rbulk->rb_mutex);
255  rbulk->rb_magic = M0_RPC_BULK_MAGIC;
256  rbulk->rb_bytes = 0;
257  rbulk->rb_rc = 0;
258  rbulk->rb_id = m0_dummy_id_generate();
259  M0_LEAVE();
260 }
261 M0_EXPORTED(m0_rpc_bulk_init);
262 
263 M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
264 {
265  M0_ENTRY("rbulk: %p", rbulk);
266  M0_PRE(rbulk != NULL);
267  m0_mutex_lock(&rbulk->rb_mutex);
269  M0_PRE(rpcbulk_tlist_is_empty(&rbulk->rb_buflist));
270  m0_mutex_unlock(&rbulk->rb_mutex);
271 
272  m0_chan_fini_lock(&rbulk->rb_chan);
273  m0_mutex_fini(&rbulk->rb_mutex);
274  rpcbulk_tlist_fini(&rbulk->rb_buflist);
275  M0_LEAVE();
276 }
277 M0_EXPORTED(m0_rpc_bulk_fini);
278 
279 M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
280 {
281  struct m0_rpc_bulk_buf *buf;
282 
283  m0_mutex_lock(&rbulk->rb_mutex);
285  m0_tl_teardown(rpcbulk, &rbulk->rb_buflist, buf) {
287  }
288  m0_mutex_unlock(&rbulk->rb_mutex);
289 }
290 
291 M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk,
292  uint32_t segs_nr, m0_bcount_t length,
293  struct m0_net_domain *netdom,
294  struct m0_net_buffer *nb,
295  struct m0_rpc_bulk_buf **out)
296 {
297  int rc;
298  struct m0_rpc_bulk_buf *buf;
299 
300  M0_ENTRY("rbulk: %p, net_dom: %p, net_buf: %p", rbulk, netdom, nb);
301  M0_PRE(rbulk != NULL);
302  M0_PRE(netdom != NULL);
303  M0_PRE(out != NULL);
304 
305  if (segs_nr > m0_net_domain_get_max_buffer_segments(netdom) ||
306  length > m0_net_domain_get_max_buffer_size(netdom))
307  return M0_ERR_INFO(-EMSGSIZE, "Cannot exceed net_max_buf_seg");
308 
309  M0_ALLOC_PTR(buf);
310  if (buf == NULL)
311  return M0_ERR(-ENOMEM);
312 
313  rc = rpc_bulk_buf_init(buf, segs_nr, length, nb);
314  if (rc != 0) {
315  m0_free(buf);
316  return M0_RC(rc);
317  }
318 
319  m0_mutex_lock(&rbulk->rb_mutex);
320  buf->bb_rbulk = rbulk;
321  rpcbulk_tlist_add_tail(&rbulk->rb_buflist, buf);
323  m0_mutex_unlock(&rbulk->rb_mutex);
324  *out = buf;
326 
327  return M0_RC(0);
328 }
329 M0_EXPORTED(m0_rpc_bulk_buf_add);
330 
331 M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf,
332  void *buf,
335  struct m0_net_domain *netdom)
336 {
337  int rc;
338  struct m0_buf cbuf;
339  struct m0_rpc_bulk *rbulk;
340 
341  M0_ENTRY("rbuf: %p, netdom: %p cnt=0x%" PRIx64 " idx=0x%"PRIx64, rbuf,
342  netdom, count, index);
343  M0_PRE(rbuf != NULL);
345  M0_PRE(buf != NULL);
346  M0_PRE(count != 0);
347  M0_PRE(netdom != NULL);
348 
349  if (rbuf->bb_zerovec.z_count + count >
352  M0_LOG(M0_DEBUG, "Cannot exceed net_dom_max_buf_segs");
353  return M0_RC(-EMSGSIZE); /* Not an error, no M0_ERR(). */
354  }
355 
356  cbuf.b_addr = buf;
357  cbuf.b_nob = count;
358  rbulk = rbuf->bb_rbulk;
359  rc = m0_0vec_cbuf_add(&rbuf->bb_zerovec, &cbuf, &index);
360  if (rc != 0)
361  return M0_ERR_INFO(rc, "Addition of cbuf");
362 
363  rbuf->bb_nbuf->nb_buffer = rbuf->bb_zerovec.z_bvec;
365  m0_mutex_lock(&rbulk->rb_mutex);
367  m0_mutex_unlock(&rbulk->rb_mutex);
368  return M0_RC(rc);
369 }
370 M0_EXPORTED(m0_rpc_bulk_buf_databuf_add);
371 
372 M0_INTERNAL void m0_rpc_bulk_qtype(struct m0_rpc_bulk *rbulk,
373  enum m0_net_queue_type q)
374 {
375  struct m0_rpc_bulk_buf *rbuf;
376 
377  M0_ENTRY("rpc_bulk: %p, qtype: %d", rbulk, q);
378  M0_PRE(rbulk != NULL);
380  M0_PRE(!rpcbulk_tlist_is_empty(&rbulk->rb_buflist));
385 
386  m0_tl_for(rpcbulk, &rbulk->rb_buflist, rbuf) {
387  rbuf->bb_nbuf->nb_qtype = q;
388  } m0_tl_endfor;
389  M0_LEAVE();
390 }
391 
392 static void addb2_add_rpc_bulk_attr(struct m0_rpc_bulk *rbulk,
393  enum m0_rpc_bulk_op_type op,
394  uint32_t buf_nr,
395  uint64_t seg_nr)
396 {
399  op);
402  buf_nr);
405  rbulk->rb_bytes);
408  seg_nr);
409 }
410 
411 static int rpc_bulk_op(struct m0_rpc_bulk *rbulk,
412  const struct m0_rpc_conn *conn,
413  struct m0_net_buf_desc_data *descs,
414  enum m0_rpc_bulk_op_type op,
415  const struct m0_net_buffer_callbacks *bulk_cb)
416 {
417  int rc = 0;
418  int cnt = 0;
419  struct m0_rpc_bulk_buf *rbuf;
420  struct m0_net_transfer_mc *tm;
421  struct m0_net_buffer *nb;
422  struct m0_net_domain *netdom;
423  struct m0_rpc_machine *rpcmach;
424  uint64_t seg_nr = 0;
425 
426  M0_ENTRY("rbulk: %p, rpc_conn: %p, rbulk_op_type: %d", rbulk, conn, op);
427  M0_PRE(rbulk != NULL);
428  M0_PRE(descs != NULL);
430  M0_PRE(bulk_cb != NULL);
431 
432  rpcmach = conn->c_rpc_machine;
433  tm = &rpcmach->rm_tm;
434  netdom = tm->ntm_dom;
435  m0_mutex_lock(&rbulk->rb_mutex);
437  M0_ASSERT(!rpcbulk_tlist_is_empty(&rbulk->rb_buflist));
438 
441 
442  m0_tl_for(rpcbulk, &rbulk->rb_buflist, rbuf) {
443  nb = rbuf->bb_nbuf;
444  if (nb->nb_length == 0)
445  nb->nb_length =
451  ergo(op == M0_RPC_BULK_LOAD, M0_IN(nb->nb_qtype,
454  nb->nb_callbacks = bulk_cb;
455 
456  /*
457  * Registers the net buffer with net domain if it is not
458  * registered already.
459  */
460  if (!(nb->nb_flags & M0_NET_BUF_REGISTERED)) {
461  rc = m0_net_buffer_register(nb, netdom);
462  if (rc != 0)
463  goto cleanup;
465  }
467  if (M0_FI_ENABLED("timeout_2s"))
468  nb->nb_timeout = m0_time_from_now(2, 0);
469 
470  if (op == M0_RPC_BULK_LOAD) {
471  rc = m0_net_desc_copy(&descs[cnt].bdd_desc,
472  &nb->nb_desc);
473  if (rc != 0)
474  goto cleanup;
475  }
476 
477  nb->nb_app_private = rbuf;
478  rc = m0_net_buffer_add(nb, tm);
479  if (rc != 0)
480  goto cleanup;
482 
483  if (op == M0_RPC_BULK_STORE) {
484  rc = m0_net_desc_copy(&nb->nb_desc,
485  &descs[cnt].bdd_desc);
486  if (rc != 0) {
487  m0_net_buffer_del(nb, tm);
489  goto cleanup;
490  }
491  descs[cnt].bdd_used = nb->nb_length;
492  }
493 
495 
496  rbulk->rb_bytes += nb->nb_length;
497  ++cnt;
498  } m0_tl_endfor;
501  m0_mutex_unlock(&rbulk->rb_mutex);
502 
503  return M0_RC(rc);
504 cleanup:
505  M0_ASSERT(rc != 0);
506 
507  M0_LOG(M0_DEBUG, "rbulk %p, rc %d", rbulk, rc);
508  rpcbulk_tlist_del(rbuf);
509  m0_tl_for(rpcbulk, &rbulk->rb_buflist, rbuf) {
510  if (rbuf->bb_flags & M0_RPC_BULK_NETBUF_QUEUED) {
511  m0_net_buffer_del(rbuf->bb_nbuf, tm);
513  }
514  } m0_tl_endfor;
515  m0_mutex_unlock(&rbulk->rb_mutex);
516  return M0_ERR(rc);
517 }
518 
519 M0_INTERNAL int
521  const struct m0_rpc_conn *conn,
522  struct m0_net_buf_desc_data *to_desc,
523  const struct m0_net_buffer_callbacks *bulk_cb)
524 {
525  return rpc_bulk_op(rbulk, conn, to_desc, M0_RPC_BULK_STORE, bulk_cb);
526 }
527 M0_EXPORTED(m0_rpc_bulk_store);
528 
529 M0_INTERNAL int
531  const struct m0_rpc_conn *conn,
532  struct m0_net_buf_desc_data *from_desc,
533  const struct m0_net_buffer_callbacks *bulk_cb)
534 {
535  return rpc_bulk_op(rbulk, conn, from_desc, M0_RPC_BULK_LOAD, bulk_cb);
536 }
537 M0_EXPORTED(m0_rpc_bulk_load);
538 
539 M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
540 {
541  bool empty;
542 
543  m0_mutex_lock(&rbulk->rb_mutex);
544  empty = rpcbulk_tlist_is_empty(&rbulk->rb_buflist);
545  m0_mutex_unlock(&rbulk->rb_mutex);
546 
547  return empty;
548 }
549 
550 M0_INTERNAL size_t m0_rpc_bulk_buf_length(struct m0_rpc_bulk *rbulk)
551 {
552  size_t buf_nr;
553 
554  m0_mutex_lock(&rbulk->rb_mutex);
555  buf_nr = rpcbulk_tlist_length(&rbulk->rb_buflist);
556  m0_mutex_unlock(&rbulk->rb_mutex);
557  return buf_nr;
558 }
559 
560 #undef M0_TRACE_SUBSYSTEM
561 
564 /*
565  * Local variables:
566  * c-indentation-style: "K&R"
567  * c-basic-offset: 8
568  * tab-width: 8
569  * fill-column: 80
570  * scroll-step: 1
571  * End:
572  */
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
M0_INTERNAL int m0_0vec_init(struct m0_0vec *zvec, uint32_t segs_nr)
Definition: vec.c:826
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
m0_bcount_t z_count
Definition: vec.h:520
static int rpc_bulk_buf_init(struct m0_rpc_bulk_buf *rbuf, uint32_t segs_nr, m0_bcount_t length, struct m0_net_buffer *nb)
Definition: bulk.c:91
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static uint32_t seg_nr
Definition: net.c:119
static struct m0_semaphore q
Definition: rwlock.c:55
M0_INTERNAL void m0_0vec_fini(struct m0_0vec *zvec)
Definition: vec.c:794
#define NULL
Definition: misc.h:38
struct m0_rpc_bulk * bb_rbulk
Definition: bulk.h:184
M0_INTERNAL int m0_rpc_bulk_store(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *to_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:520
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
#define ergo(a, b)
Definition: misc.h:293
void * b_addr
Definition: buf.h:39
M0_INTERNAL bool m0_chan_has_waiters(struct m0_chan *chan)
Definition: chan.c:185
M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf, void *buf, m0_bcount_t count, m0_bindex_t index, struct m0_net_domain *netdom)
Definition: bulk.c:331
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
m0_rpc_bulk_op_type
Definition: bulk.h:295
static bool rpc_bulk_buf_invariant(const struct m0_rpc_bulk_buf *rbuf)
Definition: bulk.c:49
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_chan rb_chan
Definition: bulk.h:258
M0_TL_DESCR_DEFINE(rpcbulk, "rpc bulk buffer list", M0_INTERNAL, struct m0_rpc_bulk_buf, bb_link, bb_magic, M0_RPC_BULK_BUF_MAGIC, M0_RPC_BULK_MAGIC)
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
uint64_t m0_bindex_t
Definition: types.h:80
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:261
static int void * buf
Definition: dir.c:1019
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
M0_TL_DEFINE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
void ** ov_buf
Definition: vec.h:149
#define PRIx64
Definition: types.h:61
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
op
Definition: libdemo.c:64
#define M0_ASSERT_EX(cond)
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
static void bulk_cb(struct m0_net_test_network_ctx *ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: network.c:212
M0_INTERNAL uint64_t m0_dummy_id_generate(void)
Definition: misc.c:425
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
int i
Definition: dir.c:1033
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
int32_t nbe_status
Definition: net.h:1218
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_bulk_default_cb(const struct m0_net_buffer_event *evt)
Definition: bulk.c:140
Definition: cnt.h:36
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
static int rpc_bulk_op(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *descs, enum m0_rpc_bulk_op_type op, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:411
struct m0_net_buffer * bb_nbuf
Definition: bulk.h:177
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
m0_bcount_t b_nob
Definition: buf.h:38
int32_t rb_rc
Definition: bulk.h:266
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
uint64_t bb_flags
Definition: bulk.h:186
void * nb_app_private
Definition: net.h:1477
const struct m0_net_buffer_callbacks * nb_callbacks
Definition: net.h:1369
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
struct m0_0vec bb_zerovec
Definition: bulk.h:179
static bool rpc_bulk_invariant(const struct m0_rpc_bulk *rbulk)
Definition: bulk.c:57
uint64_t bb_magic
Definition: bulk.h:175
M0_INTERNAL int m0_0vec_cbuf_add(struct m0_0vec *zvec, const struct m0_buf *buf, const m0_bindex_t *index)
Definition: vec.c:903
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
uint32_t v_nr
Definition: vec.h:51
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_size(struct m0_net_domain *dom)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
struct m0_rpc_conn conn
Definition: fsync.c:96
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
Definition: net.c:74
m0_bcount_t * v_count
Definition: vec.h:53
uint64_t rb_id
Definition: bulk.h:267
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
struct m0_bufvec z_bvec
Definition: vec.h:514
static void rpc_bulk_buf_deregister(struct m0_rpc_bulk_buf *buf)
Definition: bulk.c:67
M0_INTERNAL size_t m0_rpc_bulk_buf_length(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:550
struct m0_net_buf_desc bdd_desc
M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:190
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
static void rpc_bulk_buf_fini(struct m0_rpc_bulk_buf *rbuf)
Definition: bulk.c:74
m0_net_queue_type
Definition: net.h:591
uint64_t rb_magic
Definition: bulk.h:249
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
static void addb2_add_rpc_bulk_attr(struct m0_rpc_bulk *rbulk, enum m0_rpc_bulk_op_type op, uint32_t buf_nr, uint64_t seg_nr)
Definition: bulk.c:392
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
m0_bcount_t rb_bytes
Definition: bulk.h:260
M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:539
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
struct m0_net_buf_desc bdd_desc
Definition: net_otw_types.h:47
M0_INTERNAL void m0_rpc_bulk_qtype(struct m0_rpc_bulk *rbulk, enum m0_net_queue_type q)
Definition: bulk.c:372
struct m0_tl rb_buflist
Definition: bulk.h:256
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
#define out(...)
Definition: gen.c:41
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:215
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
#define M0_PRE_EX(cond)
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
int32_t rc
Definition: trigger_fop.h:47
#define M0_POST_EX(cond)
struct m0_mutex rb_mutex
Definition: bulk.h:251
static void empty(void)
Definition: consumer.c:101
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735