Motr  M0
bulkio_client.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "ut/ut.h"
24 #include "lib/memory.h"
25 #include "lib/misc.h"
26 #include "motr/magic.h"
27 #include "ioservice/io_fops.h" /* m0_io_fop */
28 #include "rpc/rpc.h" /* m0_rpc_bulk, m0_rpc_bulk_buf */
29 #include "file/file.h"
30 #include "lib/finject.h"
31 
32 #ifdef __KERNEL__
34 #endif
35 
36 enum {
38 };
39 
40 M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN);
41 extern struct m0_fop_cob_rw *io_rw_get(struct m0_fop *fop);
42 
43 static void bulkio_tm_cb(const struct m0_net_tm_event *ev)
44 {
45 }
46 
49 };
50 
51 /*
52  * This structure represents message sending/receiving entity on network
53  * for either client or server. Since rpc can not be used as is,
54  * this structure just tries to start transfer machines on both ends.
55  * Since ultimately rpc uses transfer machine rpc, message
56  * sending/receiving can be achieved easily without having to go
57  * through rpc interfaces.
58  * The m0_rpc_conn member is just a placeholder. It is needed for
59  * m0_rpc_bulk_{store/load} APIs.
60  */
61 struct bulkio_msg_tm {
64  const char *bmt_addr;
65 };
66 
67 static void bulkio_msg_tm_init(struct bulkio_msg_tm *bmt,
68  struct m0_net_domain *nd)
69 {
70  int rc;
71  struct m0_clink clink;
72  struct m0_net_transfer_mc *tm;
73 
74  M0_UT_ASSERT(bmt != NULL);
75  M0_UT_ASSERT(nd != NULL);
76  M0_UT_ASSERT(bmt->bmt_addr != NULL);
78 
79  tm = &bmt->bmt_mach.rm_tm;
80  M0_SET0(&bmt->bmt_conn);
81  bmt->bmt_conn.c_rpc_machine = &bmt->bmt_mach;
82 
85  rc = m0_net_tm_init(tm, nd);
86  M0_UT_ASSERT(rc == 0);
87 
90  rc = m0_net_tm_start(tm, bmt->bmt_addr);
91  M0_UT_ASSERT(rc == 0);
92 
93  while (tm->ntm_state != M0_NET_TM_STARTED)
95 
98 }
99 
100 static void bulkio_msg_tm_fini(struct bulkio_msg_tm *bmt)
101 {
102  int rc;
103  struct m0_clink clink;
104 
105  M0_UT_ASSERT(bmt != NULL);
106  M0_UT_ASSERT(bmt->bmt_addr != NULL);
109 
112 
113  rc = m0_net_tm_stop(&bmt->bmt_mach.rm_tm, false);
114  M0_UT_ASSERT(rc == 0);
115 
116  while (bmt->bmt_mach.rm_tm.ntm_state != M0_NET_TM_STOPPED)
118 
121 
123 }
124 
125 static void bulkclient_test(void)
126 {
127  int rc;
128  int i = 0;
129  char *sbuf;
130  int32_t max_segs;
131  m0_bcount_t max_seg_size;
132  m0_bcount_t max_buf_size;
133  struct m0_fid fid;
134  struct m0_clink s_clink;
135  struct m0_clink c_clink;
136  struct m0_io_fop *iofop;
137  struct m0_rpc_bulk *rbulk;
138  struct m0_rpc_bulk *sbulk;
139  struct m0_fop_cob_rw *rw;
140  struct m0_net_domain *nd;
141  struct m0_net_buffer *nb;
142  struct m0_net_buffer **nbufs;
143  struct m0_rpc_bulk_buf *rbuf;
144  struct m0_rpc_bulk_buf *rbuf1;
145  struct m0_rpc_bulk_buf *rbuf2;
146  const char *caddr = "0@lo:12345:34:*";
147  const char *saddr = "0@lo:12345:34:8";
148  enum m0_net_queue_type q;
149  struct bulkio_msg_tm *ctm;
150  struct bulkio_msg_tm *stm;
151  struct m0_rm_domain *rm_dom;
152  struct m0_file *file;
153 
154  struct m0_rm_resource_type flock_rt = {
155  .rt_name = "File Lock Resource Type"
156  };
157 
158  M0_ALLOC_PTR(nd);
159  M0_ASSERT(nd != NULL);
160  M0_SET0(nd);
161  M0_ALLOC_PTR(iofop);
162  M0_ASSERT(iofop != NULL);
163  M0_SET0(iofop);
165  M0_ASSERT(file != NULL);
166  M0_SET0(file);
167 
168  M0_ALLOC_PTR(rm_dom);
169  M0_ASSERT(rm_dom != NULL);
170  m0_rm_domain_init(rm_dom);
171  rc = m0_file_lock_type_register(rm_dom, &flock_rt);
172  M0_ASSERT(rc == 0);
173 
175  M0_UT_ASSERT(rc == 0);
176 
177  fid.f_container = 1;
178  fid.f_key = 4;
179  m0_file_init(file, &fid, rm_dom, M0_DI_NONE);
180 
181  /* Test : m0_io_fop_init() */
183  M0_UT_ASSERT(rc == 0);
185  M0_UT_ASSERT(iofop->if_fop.f_type != NULL);
187  M0_UT_ASSERT(iofop->if_fop.f_item.ri_ops != NULL);
188 
191  M0_UT_ASSERT(iofop->if_rbulk.rb_bytes == 0);
192  M0_UT_ASSERT(iofop->if_rbulk.rb_rc == 0);
193 
194  /* Test : m0_fop_to_rpcbulk() */
195  rbulk = m0_fop_to_rpcbulk(&iofop->if_fop);
196  M0_UT_ASSERT(rbulk != NULL);
197  M0_UT_ASSERT(rbulk == &iofop->if_rbulk);
198 
199  /* Test : m0_rpc_bulk_buf_add() */
200  rc = m0_rpc_bulk_buf_add(rbulk, IO_SINGLE_BUFFER, 0, nd, NULL, &rbuf);
201  M0_UT_ASSERT(rc == 0);
202  M0_UT_ASSERT(rbuf != NULL);
203 
204  /* Test : m0_rpc_bulk_buf structure. */
205  M0_UT_ASSERT(m0_tlink_is_in(&rpcbulk_tl, rbuf));
207  M0_UT_ASSERT(rbuf->bb_rbulk == rbulk);
208  M0_UT_ASSERT(rbuf->bb_nbuf != NULL);
209 
210  /*
211  * Since no external net buffer was passed to m0_rpc_bulk_buf_add(),
212  * it should allocate a net buffer internally and m0_rpc_bulk_buf::
213  * bb_flags should be M0_RPC_BULK_NETBUF_ALLOCATED.
214  */
216 
217  /* Test : m0_rpc_bulk_buf_add() - Error case. */
219  rc = m0_rpc_bulk_buf_add(rbulk, max_segs + 1, 0, nd, NULL, &rbuf1);
220  M0_UT_ASSERT(rc == -EMSGSIZE);
221 
222  /* Test : m0_rpc_bulk_buf_databuf_add(). */
223  sbuf = m0_alloc_aligned(M0_0VEC_ALIGN, M0_0VEC_SHIFT);
224  M0_UT_ASSERT(sbuf != NULL);
225  memset(sbuf, 'a', M0_0VEC_ALIGN);
226  rc = m0_rpc_bulk_buf_databuf_add(rbuf, sbuf, M0_0VEC_ALIGN, 0, nd);
227  M0_UT_ASSERT(rc == 0);
229  M0_0VEC_ALIGN);
232 
233  /* Test : m0_rpc_bulk_buf_databuf_add() - Error case. */
235  rc = m0_rpc_bulk_buf_databuf_add(rbuf, sbuf, max_seg_size + 1, 0, nd);
236  /* Segment size bigger than permitted segment size. */
237  M0_UT_ASSERT(rc == -EMSGSIZE);
238 
239  max_buf_size = m0_net_domain_get_max_buffer_size(nd);
240  rc = m0_rpc_bulk_buf_databuf_add(rbuf, sbuf, max_buf_size + 1, 0, nd);
241  /* Max buffer size greater than permitted max buffer size. */
242  M0_UT_ASSERT(rc == -EMSGSIZE);
243 
244  /* Test : m0_rpc_bulk_buflist_empty() */
246  M0_UT_ASSERT(m0_tlist_is_empty(&rpcbulk_tl, &rbulk->rb_buflist));
247 
248  rc = m0_rpc_bulk_buf_add(rbulk, IO_SINGLE_BUFFER, 0, nd, NULL, &rbuf);
249  M0_UT_ASSERT(rc == 0);
250  M0_UT_ASSERT(rbuf != NULL);
251  rc = m0_rpc_bulk_buf_databuf_add(rbuf, sbuf, M0_0VEC_ALIGN, 0, nd);
252  M0_UT_ASSERT(rc == 0);
253 
254  /* Test : m0_rpc_bulk_buf_add(nb != NULL)*/
255  M0_ALLOC_PTR(nb);
256  M0_UT_ASSERT(nb != NULL);
258  M0_0VEC_ALIGN, M0_0VEC_SHIFT);
259  M0_UT_ASSERT(rc == 0);
260  memset(nb->nb_buffer.ov_buf[IO_SINGLE_BUFFER - 1], 'a', M0_0VEC_ALIGN);
261 
263  nd, nb, &rbuf1);
264  M0_UT_ASSERT(rc == 0);
265  M0_UT_ASSERT(rbuf1 != NULL);
266  M0_UT_ASSERT(rbuf1->bb_nbuf == nb);
268 
269  M0_UT_ASSERT(rbuf->bb_nbuf != NULL);
270  /* In normal code path, an io_request is allocated. io_request embeds io_fop.
271  * In this UT, only io_fop is allocated here. So, skip di_prepare here. */
272  m0_fi_enable("io_fop_di_prepare", "skip_di_for_ut");
273  rc = m0_io_fop_prepare(&iofop->if_fop);
274  M0_UT_ASSERT(rc == 0);
275  m0_fi_disable("io_fop_di_prepare", "skip_di_for_ut");
276  rw = io_rw_get(&iofop->if_fop);
277  M0_UT_ASSERT(rw != NULL);
278 
279  M0_ALLOC_PTR(ctm);
280  M0_UT_ASSERT(ctm != NULL);
281  ctm->bmt_addr = caddr;
282  bulkio_msg_tm_init(ctm, nd);
283 
284  rc = m0_rpc_bulk_store(rbulk, &ctm->bmt_conn, rw->crw_desc.id_descs,
286  M0_UT_ASSERT(rc == 0);
287 
288  /*
289  * There is no ACTIVE side _yet_ to start the bulk transfer and
290  * hence the buffer is guaranteed to stay put in the
291  * PASSIVE_BULK_SEND queue of TM.
292  */
293  m0_mutex_lock(&rbulk->rb_mutex);
294  m0_tl_for(rpcbulk, &rbulk->rb_buflist, rbuf) {
297  M0_UT_ASSERT(rbuf->bb_nbuf != NULL);
298  M0_UT_ASSERT(rbuf->bb_nbuf->nb_app_private == rbuf);
301  rc = memcmp(rbuf->bb_nbuf->nb_desc.nbd_data,
303  rbuf->bb_nbuf->nb_desc.nbd_len);
304  M0_UT_ASSERT(rc == 0);
305  ++i;
306  } m0_tl_endfor;
307  M0_UT_ASSERT(rbulk->rb_bytes == 2 * M0_0VEC_ALIGN);
308  /* Register a clink for client side rbulk. */
309  m0_clink_init(&c_clink, NULL);
310  m0_clink_add(&rbulk->rb_chan, &c_clink);
311  m0_mutex_unlock(&rbulk->rb_mutex);
312 
313  /* Start server side TM. */
314  M0_ALLOC_PTR(stm);
315  M0_UT_ASSERT(stm != NULL);
316  stm->bmt_addr = saddr;
317  bulkio_msg_tm_init(stm, nd);
318 
319  /*
320  * Bulk server (receiving side) typically uses m0_rpc_bulk structure
321  * without having to use m0_io_fop.
322  */
323  M0_ALLOC_PTR(sbulk);
324  M0_UT_ASSERT(sbulk != NULL);
325 
326  /*
327  * Pretends that io fop is received and starts zero copy.
328  * Actual fop can not be sent since rpc server hands over any
329  * incoming fop to associated request handler. And request
330  * handler does not work in kernel space at the moment.
331  */
332  m0_rpc_bulk_init(sbulk);
333 
334  M0_ALLOC_ARR(nbufs, rw->crw_desc.id_nr);
335  M0_UT_ASSERT(nbufs != NULL);
336  for (i = 0; i < rw->crw_desc.id_nr; ++i) {
337  M0_ALLOC_PTR(nbufs[i]);
338  M0_UT_ASSERT(nbufs[i] != NULL);
339  rc = m0_bufvec_alloc_aligned(&nbufs[i]->nb_buffer, 1,
340  M0_0VEC_ALIGN, M0_0VEC_SHIFT);
341  M0_UT_ASSERT(rc == 0);
342  }
343 
344  for (i = 0; i < rw->crw_desc.id_nr; ++i) {
345  rc = m0_rpc_bulk_buf_add(sbulk, 1, 0, nd, nbufs[i],
346  &rbuf2);
347  M0_UT_ASSERT(rc == 0);
348  M0_UT_ASSERT(rbuf2 != NULL);
349  }
350 
351  m0_mutex_lock(&sbulk->rb_mutex);
354  m0_rpc_bulk_qtype(sbulk, q);
355  m0_clink_init(&s_clink, NULL);
356  m0_clink_add(&sbulk->rb_chan, &s_clink);
357  m0_mutex_unlock(&sbulk->rb_mutex);
358 
359  rc = m0_rpc_bulk_load(sbulk, &stm->bmt_conn, rw->crw_desc.id_descs,
361 
362  /*
363  * Buffer completion callbacks also wait to acquire the
364  * m0_rpc_bulk::rb_mutex and in any case asserts inside the loop
365  * are protected from buffer completion callbacks which do some
366  * cleanup due to the lock.
367  */
368  m0_mutex_lock(&sbulk->rb_mutex);
369  m0_tl_for(rpcbulk, &sbulk->rb_buflist, rbuf2) {
370  M0_UT_ASSERT(rbuf2->bb_nbuf != NULL);
372  M0_UT_ASSERT(rbuf2->bb_nbuf->nb_app_private == rbuf2);
373  M0_UT_ASSERT(rbuf2->bb_rbulk == sbulk);
376  } m0_tl_endfor;
377  M0_UT_ASSERT(sbulk->rb_bytes == 2 * M0_0VEC_ALIGN);
378  m0_mutex_unlock(&sbulk->rb_mutex);
379  /* Waits for zero copy to complete. */
380  M0_UT_ASSERT(rc == 0);
381  m0_chan_wait(&s_clink);
382  m0_chan_wait(&c_clink);
383 
384  m0_mutex_lock(&sbulk->rb_mutex);
385  m0_mutex_lock(&rbulk->rb_mutex);
386  M0_UT_ASSERT(m0_tlist_is_empty(&rpcbulk_tl, &sbulk->rb_buflist));
387  m0_clink_del(&s_clink);
388  m0_clink_del(&c_clink);
389  m0_mutex_unlock(&rbulk->rb_mutex);
390  m0_mutex_unlock(&sbulk->rb_mutex);
391 
392  m0_clink_fini(&s_clink);
393  m0_clink_fini(&c_clink);
394  m0_rpc_bulk_fini(sbulk);
395  for (i = 0; i < rw->crw_desc.id_nr; ++i) {
396  rc = memcmp(nbufs[i]->nb_buffer.ov_buf[IO_SINGLE_BUFFER - 1],
397  sbuf, M0_0VEC_ALIGN);
398  M0_UT_ASSERT(rc == 0);
399  }
400 
401  /* Rpc bulk op timeout */
402 
403  rc = m0_rpc_bulk_buf_add(rbulk, IO_SINGLE_BUFFER, 0, nd, nb, &rbuf1);
404  M0_UT_ASSERT(rc == 0);
405  rc = m0_rpc_bulk_store(rbulk, &ctm->bmt_conn, rw->crw_desc.id_descs,
407  M0_UT_ASSERT(rc == 0);
408  m0_mutex_lock(&rbulk->rb_mutex);
409  m0_clink_init(&c_clink, NULL);
410  m0_clink_add(&rbulk->rb_chan, &c_clink);
412  M0_UT_ASSERT(rc == 0);
413  m0_mutex_unlock(&rbulk->rb_mutex);
414  m0_rpc_bulk_store_del(rbulk);
415  m0_chan_wait(&c_clink);
416  m0_mutex_lock(&rbulk->rb_mutex);
417  m0_clink_del(&c_clink);
418  m0_mutex_unlock(&rbulk->rb_mutex);
419  m0_clink_fini(&c_clink);
420 
421  m0_rpc_bulk_init(sbulk);
422  for (i = 0; i < rw->crw_desc.id_nr; ++i) {
423  rc = m0_rpc_bulk_buf_add(sbulk, 1, 0, nd, nbufs[i], &rbuf2);
424  M0_UT_ASSERT(rc == 0);
425  M0_UT_ASSERT(rbuf2 != NULL);
426  }
427 
428  m0_mutex_lock(&sbulk->rb_mutex);
429  m0_rpc_bulk_qtype(sbulk, q);
430  m0_clink_init(&s_clink, NULL);
431  m0_clink_add(&sbulk->rb_chan, &s_clink);
432  m0_mutex_unlock(&sbulk->rb_mutex);
433  m0_fi_enable("rpc_bulk_op", "timeout_2s");
434  rc = m0_rpc_bulk_load(sbulk, &stm->bmt_conn, rw->crw_desc.id_descs,
436  M0_UT_ASSERT(rc == 0);
437  m0_chan_wait(&s_clink);
438  m0_fi_disable("rpc_bulk_op", "timeout_2s");
439  m0_mutex_lock(&sbulk->rb_mutex);
440  M0_UT_ASSERT(m0_tlist_is_empty(&rpcbulk_tl, &sbulk->rb_buflist));
441  m0_clink_del(&s_clink);
442  m0_mutex_unlock(&sbulk->rb_mutex);
443 
444  m0_clink_fini(&s_clink);
445 
446 
447  bulkio_msg_tm_fini(ctm);
448  bulkio_msg_tm_fini(stm);
449  m0_free(ctm);
450  m0_free(stm);
451 
452  for (i = 0; i < rw->crw_desc.id_nr; ++i) {
453  m0_bufvec_free_aligned(&nbufs[i]->nb_buffer, M0_0VEC_SHIFT);
454  m0_free(nbufs[i]);
455  }
456  m0_free(nbufs);
457  m0_free(sbulk);
458 
459  m0_io_fop_destroy(&iofop->if_fop);
461  M0_UT_ASSERT(rw->crw_desc.id_nr == 0);
463  M0_UT_ASSERT(rw->crw_ivec.ci_nr == 0);
464 
466  M0_UT_ASSERT(m0_tlist_is_empty(&rpcbulk_tl, &rbulk->rb_buflist));
467 
468  m0_bufvec_free_aligned(&nb->nb_buffer, M0_0VEC_SHIFT);
469  m0_free(nb);
470 
471  /* Cleanup. */
472  m0_free_aligned(sbuf, M0_0VEC_ALIGN, M0_0VEC_SHIFT);
473  m0_io_fop_fini(iofop);
474  m0_free(iofop);
476  m0_free(nd);
478  m0_free(file);
479  m0_file_lock_type_deregister(&flock_rt);
480  m0_rm_domain_fini(rm_dom);
481  m0_free(rm_dom);
482 }
483 
485  .ts_name = "bulk-client-ut",
486  .ts_tests = {
487  { "bulkclient_test", bulkclient_test},
488  { NULL, NULL }
489  }
490 };
491 M0_EXPORTED(bulkio_client_ut);
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
struct m0_ut_suite bulkio_client_ut
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
void m0_net_domain_fini(struct m0_net_domain *dom)
Definition: domain.c:71
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: tm.c:261
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
struct m0_rpc_bulk * bb_rbulk
Definition: bulk.h:184
M0_INTERNAL int m0_rpc_bulk_store(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *to_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:520
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_bufvec nb_buffer
Definition: net.h:1322
uint32_t nbd_len
Definition: net_otw_types.h:37
static struct m0_net_tm_callbacks bulkio_ut_tm_cb
Definition: bulkio_client.c:47
static void bulkio_msg_tm_init(struct bulkio_msg_tm *bmt, struct m0_net_domain *nd)
Definition: bulkio_client.c:67
struct m0_file file
Definition: di.c:36
static void bulkio_tm_cb(const struct m0_net_tm_event *ev)
Definition: bulkio_client.c:43
M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf, void *buf, m0_bcount_t count, m0_bindex_t index, struct m0_net_domain *netdom)
Definition: bulk.c:331
const char * bmt_addr
Definition: bulkio_client.c:64
uint8_t * nbd_data
Definition: net_otw_types.h:38
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_chan rb_chan
Definition: bulk.h:258
enum m0_net_tm_state ntm_state
Definition: net.h:819
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
struct m0_rpc_bulk if_rbulk
Definition: io_fops.h:177
uint64_t nb_flags
Definition: net.h:1489
uint64_t t_magic
Definition: tlist.h:256
struct m0_net_buf_desc_data * id_descs
Definition: io_fops.h:313
M0_INTERNAL void m0_free_aligned(void *data, size_t size, unsigned shift)
Definition: memory.c:192
M0_INTERNAL void m0_rm_domain_init(struct m0_rm_domain *dom)
Definition: rm.c:215
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
uint32_t ci_nr
Definition: vec.h:635
Definition: ut.h:77
static int struct dentry int struct nameidata * nd
Definition: dir.c:593
M0_INTERNAL void m0_io_fop_destroy(struct m0_fop *fop)
Definition: io_fops.c:1581
void ** ov_buf
Definition: vec.h:149
M0_INTERNAL bool m0_tlist_is_empty(const struct m0_tl_descr *d, const struct m0_tl *list)
Definition: tlist.c:96
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
#define m0_tl_endfor
Definition: tlist.h:700
struct m0_fid fid
Definition: di.c:46
int m0_bufvec_alloc_aligned(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size, unsigned shift)
Definition: vec.c:355
M0_INTERNAL void m0_file_lock_type_deregister(struct m0_rm_resource_type *flock_rt)
Definition: file.c:561
struct m0_chan ntm_chan
Definition: net.h:874
M0_INTERNAL int m0_file_lock_type_register(struct m0_rm_domain *dom, struct m0_rm_resource_type *flock_rt)
Definition: file.c:549
uint64_t if_magic
Definition: io_fops.h:179
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
int i
Definition: dir.c:1033
M0_INTERNAL struct m0_rpc_bulk * m0_fop_to_rpcbulk(const struct m0_fop *fop)
Definition: io_fops.c:904
struct m0_fop_type * f_type
Definition: fop.h:81
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
static void bulkio_msg_tm_fini(struct bulkio_msg_tm *bmt)
static void bulkclient_test(void)
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
Definition: tm.c:160
struct m0_fop if_fop
Definition: io_fops.h:174
M0_INTERNAL void m0_fi_disable(const char *fp_func, const char *fp_tag)
Definition: finject.c:485
struct m0_net_buffer * bb_nbuf
Definition: bulk.h:177
static void m0_fi_enable(const char *func, const char *tag)
Definition: finject.h:276
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
int32_t rb_rc
Definition: bulk.h:266
struct m0_io_descs crw_desc
Definition: io_fops.h:400
#define M0_ASSERT(cond)
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
Definition: tm.c:204
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
uint64_t bb_flags
Definition: bulk.h:186
void * nb_app_private
Definition: net.h:1477
struct m0_net_xprt * m0_net_xprt_default_get(void)
Definition: net.c:151
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
const struct m0_net_buffer_callbacks * nb_callbacks
Definition: net.h:1369
uint64_t f_container
Definition: fid.h:39
struct m0_0vec bb_zerovec
Definition: bulk.h:179
uint64_t bb_magic
Definition: bulk.h:175
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_size(struct m0_net_domain *dom)
struct m0_rpc_conn bmt_conn
Definition: bulkio_client.c:63
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
Definition: tm.c:293
M0_INTERNAL void m0_bufvec_free_aligned(struct m0_bufvec *bufvec, unsigned shift)
Definition: vec.c:436
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
struct m0_bufvec z_bvec
Definition: vec.h:514
M0_INTERNAL bool m0_tlink_is_in(const struct m0_tl_descr *d, const void *obj)
Definition: tlist.c:103
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
const char * rt_name
Definition: rm.h:389
const char * ts_name
Definition: ut.h:99
M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:190
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
Definition: domain.c:36
Definition: fid.h:38
uint64_t f_key
Definition: fid.h:40
m0_net_queue_type
Definition: net.h:591
uint64_t rb_magic
Definition: bulk.h:249
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
uint32_t id_nr
Definition: io_fops.h:312
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
m0_bcount_t rb_bytes
Definition: bulk.h:260
struct m0_rpc_machine bmt_mach
Definition: bulkio_client.c:62
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL int32_t m0_net_domain_get_max_buffer_segments(struct m0_net_domain *dom)
struct m0_net_buf_desc bdd_desc
Definition: net_otw_types.h:47
M0_INTERNAL void m0_file_fini(struct m0_file *file)
Definition: file.c:498
M0_INTERNAL void m0_rpc_bulk_qtype(struct m0_rpc_bulk *rbulk, enum m0_net_queue_type q)
Definition: bulk.c:372
struct m0_tl rb_buflist
Definition: bulk.h:256
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
M0_INTERNAL void m0_io_fop_fini(struct m0_io_fop *iofop)
Definition: io_fops.c:897
M0_INTERNAL int m0_io_fop_init(struct m0_io_fop *iofop, const struct m0_fid *gfid, struct m0_fop_type *ftype, void(*fop_release)(struct m0_ref *))
Definition: io_fops.c:865
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:215
Definition: file.h:81
M0_INTERNAL bool m0_is_read_fop(const struct m0_fop *fop)
Definition: io_fops.c:916
Definition: di.h:73
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
M0_INTERNAL void * m0_alloc_aligned(size_t size, unsigned shift)
Definition: memory.c:168
struct m0_fop_cob_rw * io_rw_get(struct m0_fop *fop)
Definition: io_fops.c:1037
M0_INTERNAL int m0_io_fop_prepare(struct m0_fop *fop)
Definition: io_fops.c:1513
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
struct m0_fop_type m0_fop_cob_writev_fopt
Definition: io_fops.c:72
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
struct m0_rpc_item f_item
Definition: fop.h:83
struct m0_ioseg * ci_iosegs
Definition: vec.h:636
struct m0_io_indexvec crw_ivec
Definition: io_fops.h:411
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL void m0_rm_domain_fini(struct m0_rm_domain *dom)
Definition: rm.c:227
const struct m0_net_tm_callbacks * ntm_callbacks
Definition: net.h:816
#define M0_UT_ASSERT(a)
Definition: ut.h:46
Definition: fop.h:79
struct m0_mutex rb_mutex
Definition: bulk.h:251
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)
M0_INTERNAL void m0_file_init(struct m0_file *file, const struct m0_fid *fid, struct m0_rm_domain *dom, enum m0_di_types di_type)
Definition: file.c:477