Motr  M0
at.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 #include "lib/errno.h"
28 #include "lib/chan.h" /* m0_clink */
29 #include "lib/vec.h"
30 #include "sm/sm.h" /* m0_sm_ast */
31 #include "net/net.h" /* m0_net_queue_type */
32 #include "rpc/session.h" /* m0_rpc_session */
33 #include "rpc/bulk.h" /* m0_rpc_bulk */
34 #include "rpc/rpc_machine.h" /* m0_rpc_machine */
35 #include "rpc/conn.h"
36 #include "fop/fom.h"
37 #include "fop/fop.h"
38 #include "rpc/at.h"
39 
46 M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN);
47 M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf);
48 
49 struct rpc_at_bulk {
53  const struct m0_rpc_conn *ac_conn;
56  struct m0_buf ac_recv;
70  struct m0_sm_ast ac_ast;
72  int ac_rc;
73 };
74 
75 static struct rpc_at_bulk *rpc_at_bulk(const struct m0_rpc_at_buf *ab)
76 {
77  return ab->u.ab_extra.abr_bulk;
78 }
79 
81 {
83 }
84 
85 static struct m0_net_domain *rpc_at_bulk_ndom(const struct rpc_at_bulk *atbulk)
86 {
87  return atbulk->ac_conn->c_rpc_machine->rm_tm.ntm_dom;
88 }
89 
90 static struct m0_rpc_conn *fom_conn(const struct m0_fom *fom)
91 {
92  return fom->fo_fop->f_item.ri_session->s_conn;
93 }
94 
95 static uint64_t rpc_at_bulk_segs_nr(const struct rpc_at_bulk *atbulk,
98 {
100  rpc_at_bulk_ndom(atbulk));
101  /*
102  * Arbitrary select 1MB as the maximum segment size.
103  */
104  *seg_size = min64u(*seg_size, 1024*1024);
105  return (data_size + *seg_size - 1) / *seg_size;
106 }
107 
108 static int rpc_at_bulk_nb_alloc(struct m0_rpc_at_buf *ab, uint64_t size)
109 {
110  struct rpc_at_bulk *atbulk = rpc_at_bulk(ab);
111  struct m0_net_buffer *nb = &atbulk->ac_nb;
112  struct m0_bufvec *bvec = &nb->nb_buffer;
114  uint64_t segs_nr;
115  int rc;
116 
119  segs_nr = rpc_at_bulk_segs_nr(atbulk, size, &seg_size);
121  m0_pageshift_get());
122  if (rc == 0)
124  return M0_RC(rc);
125 }
126 
127 static void rpc_at_bulk_nb_free(struct rpc_at_bulk *atbulk)
128 {
130 }
131 
132 static int rpc_at_bulk_init(struct m0_rpc_at_buf *ab,
133  const struct m0_rpc_conn *conn)
134 {
135  struct rpc_at_bulk *atbulk;
136 
137  M0_PRE(ab != NULL);
138  M0_PRE(conn != NULL);
139 
140  M0_ENTRY();
141  M0_ALLOC_PTR(atbulk);
142  if (atbulk == NULL)
143  return M0_ERR(-ENOMEM);
144  ab->u.ab_extra.abr_bulk = atbulk;
145  atbulk->ac_atbuf = ab;
146  atbulk->ac_conn = conn;
147  atbulk->ac_rc = 0;
148  atbulk->ac_recv = M0_BUF_INIT0;
149  m0_rpc_bulk_init(&atbulk->ac_bulk);
150  return M0_RC(0);
151 }
152 
153 static void rpc_at_bulk_store_del(struct m0_rpc_bulk *rbulk)
154 {
155  struct m0_clink clink;
156 
158  m0_clink_add_lock(&rbulk->rb_chan, &clink);
159  m0_rpc_bulk_store_del(rbulk);
163 }
164 
166 {
167  struct m0_rpc_bulk_buf *rbuf;
168  enum m0_net_queue_type ret;
169 
170  M0_PRE(!m0_rpc_bulk_is_empty(rbulk));
171 
172  m0_mutex_lock(&rbulk->rb_mutex);
173  rbuf = rpcbulk_tlist_head(&rbulk->rb_buflist);
174  ret = rbuf->bb_nbuf->nb_qtype;
175 
176  /* Check that all elements have the same type. */
177  M0_ASSERT(m0_tl_forall(rpcbulk, rbuf, &rbulk->rb_buflist,
178  rbuf->bb_nbuf->nb_qtype == ret));
179  m0_mutex_unlock(&rbulk->rb_mutex);
180  return ret;
181 }
182 
183 static void rpc_at_bulk_fini(struct rpc_at_bulk *atbulk)
184 {
185  struct m0_rpc_bulk *rbulk = &atbulk->ac_bulk;
186 
187  M0_PRE(atbulk != NULL);
188  M0_PRE(rbulk != NULL);
189 
190  if (!m0_rpc_bulk_is_empty(rbulk) &&
193  rpc_at_bulk_store_del(rbulk);
195  m0_rpc_bulk_fini(rbulk);
196  if (m0_buf_is_set(&atbulk->ac_recv))
197  m0_buf_free(&atbulk->ac_recv);
198  rpc_at_bulk_nb_free(atbulk);
199  m0_free(atbulk);
200 }
201 
202 static int rpc_at_bulk_csend(struct m0_rpc_at_buf *ab,
203  const struct m0_buf *buf)
204 {
205  struct rpc_at_bulk *atbulk = rpc_at_bulk(ab);
206  struct m0_rpc_bulk_buf *rbuf;
207  struct m0_net_domain *nd;
208  uint64_t segs_nr;
209  int i;
211  m0_bcount_t blen = buf->b_nob;
212  int rc;
213 
215  nd = rpc_at_bulk_ndom(atbulk);
216  segs_nr = rpc_at_bulk_segs_nr(atbulk, blen, &seg_size);
217  rc = m0_rpc_bulk_buf_add(&atbulk->ac_bulk, segs_nr, blen,
218  nd, NULL, &rbuf);
219  if (rc == 0) {
220  for (i = 0; i < segs_nr; ++i) {
222  buf->b_addr + i * seg_size,
223  min_check(blen, seg_size), i, nd);
224  blen -= seg_size;
225  }
227  rc = m0_rpc_bulk_store(&atbulk->ac_bulk,
228  atbulk->ac_conn,
229  &ab->u.ab_send,
231  }
232  return M0_RC(rc);
233 }
234 
235 static int rpc_at_bulk_srecv(struct m0_rpc_at_buf *ab, struct m0_fom *fom)
236 {
237  struct rpc_at_bulk *atbulk = rpc_at_bulk(ab);
238  struct m0_rpc_bulk *rbulk = &atbulk->ac_bulk;
239  struct m0_rpc_bulk_buf *rbuf;
240  struct m0_net_domain *nd;
241  uint64_t segs_nr;
243  uint64_t size = ab->u.ab_send.bdd_used;
244  int rc;
245 
247  nd = rpc_at_bulk_ndom(atbulk);
248  segs_nr = rpc_at_bulk_segs_nr(atbulk, size, &seg_size);
249  rc = rpc_at_bulk_nb_alloc(ab, size) ?:
250  m0_rpc_bulk_buf_add(rbulk, segs_nr, size,
251  nd, &atbulk->ac_nb, &rbuf);
252  if (rc == 0) {
253  m0_mutex_lock(&rbulk->rb_mutex);
254  m0_fom_wait_on(fom, &rbulk->rb_chan, &fom->fo_cb);
255  m0_mutex_unlock(&rbulk->rb_mutex);
256 
257  rc = m0_rpc_bulk_load(rbulk, fom_conn(fom), &ab->u.ab_send,
259  if (rc != 0) {
260  m0_mutex_lock(&rbulk->rb_mutex);
261  m0_fom_callback_cancel(&fom->fo_cb);
262  m0_mutex_unlock(&rbulk->rb_mutex);
264  }
265  }
266  if (rc != 0)
267  atbulk->ac_rc = M0_ERR(rc);
268  return M0_RC(rc);
269 }
270 
271 static int rpc_at_bulk_srecv_rc(const struct m0_rpc_at_buf *ab,
272  struct m0_buf *buf)
273 {
274  struct rpc_at_bulk *atbulk = rpc_at_bulk(ab);
275  struct m0_bufvec *bvec = &atbulk->ac_nb.nb_buffer;
276  struct m0_buf *recv = &atbulk->ac_recv;
277  int rc;
278 
279  rc = atbulk->ac_rc ?:
280  atbulk->ac_bulk.rb_rc;
281  if (rc == 0 && !m0_buf_is_set(recv))
282  rc = m0_bufvec_splice(bvec, atbulk->ac_nb.nb_length, recv);
283  if (rc == 0)
284  *buf = *recv;
285  return M0_RC(rc);
286 }
287 
288 static int rpc_at_bulk_crecv(struct m0_rpc_at_buf *ab,
289  uint32_t len)
290 {
291  struct rpc_at_bulk *atbulk = rpc_at_bulk(ab);
292  struct m0_rpc_bulk *rbulk = &atbulk->ac_bulk;
293  struct m0_rpc_bulk_buf *rbuf;
294  struct m0_net_domain *nd;
295  uint64_t segs_nr;
297  int rc;
298 
300  nd = rpc_at_bulk_ndom(atbulk);
301  segs_nr = rpc_at_bulk_segs_nr(atbulk, len, &seg_size);
302  rc = rpc_at_bulk_nb_alloc(ab, len) ?:
303  m0_rpc_bulk_buf_add(rbulk, segs_nr, len, nd,
304  &atbulk->ac_nb, &rbuf);
305  if (rc == 0) {
307  rc = m0_rpc_bulk_store(rbulk, atbulk->ac_conn, &ab->u.ab_recv,
309  if (rc != 0)
311  }
312  return M0_RC(rc);
313 }
314 
315 static void rpc_at_ssend_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
316 {
317  struct rpc_at_bulk *atbulk = M0_AMB(atbulk, ast, ac_ast);
318  struct m0_rpc_at_buf *atbuf = atbulk->ac_atbuf;
319  struct m0_rpc_at_extra *atextra = &atbuf->u.ab_extra;
320  struct m0_fom *user_fom = atbulk->ac_user_fom;
321 
322  rpc_at_bulk_fini(atbulk);
323  atextra->abr_bulk = NULL;
325  m0_buf_free(&atextra->abr_user_buf);
326  M0_ASSERT(user_fom != NULL);
327  m0_fom_wakeup(user_fom);
328 }
329 
331 {
332  struct rpc_at_bulk *atbulk = M0_AMB(atbulk, clink, ac_clink);
333  struct m0_rpc_at_buf *atbuf = atbulk->ac_atbuf;
334 
335  atbuf->u.ab_rep.abr_rc = atbulk->ac_bulk.rb_rc;
338  atbulk->ac_ast.sa_cb = rpc_at_ssend_ast_cb;
339  m0_sm_ast_post(&atbulk->ac_user_fom->fo_loc->fl_group, &atbulk->ac_ast);
340  return true;
341 }
342 
343 static int rpc_at_bulk_ssend(struct m0_rpc_at_buf *in,
344  struct m0_rpc_at_buf *out,
345  struct m0_buf *buf,
346  struct m0_fom *fom)
347 {
348  struct rpc_at_bulk *atbulk = rpc_at_bulk(out);
349  struct m0_rpc_bulk *rbulk = &atbulk->ac_bulk;
350  struct m0_rpc_bulk_buf *rbuf;
351  struct m0_net_domain *nd;
352  uint64_t segs_nr;
354  m0_bcount_t blen = buf->b_nob;
355  struct m0_clink *clink = &atbulk->ac_clink;
356  int i;
357  int rc;
358 
359  M0_PRE(in != NULL);
361  M0_PRE(out->ab_type == M0_RPC_AT_BULK_REP);
362  M0_PRE(in->u.ab_recv.bdd_used >= blen);
363  nd = rpc_at_bulk_ndom(atbulk);
364  segs_nr = rpc_at_bulk_segs_nr(atbulk, blen, &seg_size);
365  rc = m0_rpc_bulk_buf_add(&atbulk->ac_bulk, segs_nr, blen,
366  nd, NULL, &rbuf);
367  if (rc == 0) {
368  for (i = 0; i < segs_nr; ++i) {
370  buf->b_addr + i * seg_size,
371  min_check(blen, seg_size), i, nd);
372  blen -= seg_size;
373  }
374 
376  m0_clink_add_lock(&rbulk->rb_chan, clink);
377  atbulk->ac_user_fom = fom;
378 
380  rc = m0_rpc_bulk_load(rbulk, fom_conn(fom), &in->u.ab_recv,
382  if (rc != 0) {
386  } else {
387  /*
388  * Store reference to the user buffer to free it when
389  * transmission is complete.
390  */
391  out->u.ab_extra.abr_user_buf = *buf;
392  }
393  }
394  if (rc != 0)
395  atbulk->ac_rc = M0_ERR(rc);
396  return M0_RC(rc);
397 }
398 
399 M0_INTERNAL int m0_rpc_at_get(const struct m0_rpc_at_buf *ab,
400  struct m0_buf *buf)
401 {
402  switch (ab->ab_type) {
403  case M0_RPC_AT_INLINE:
404  *buf = ab->u.ab_buf;
405  return 0;
406  case M0_RPC_AT_BULK_SEND:
407  return rpc_at_bulk_srecv_rc(ab, buf);
408  default:
409  return M0_ERR_INFO(-EPROTO, "Incorrect AT type %u",
410  ab->ab_type);
411  }
412 }
413 
414 M0_INTERNAL int m0_rpc_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom,
415  int next_phase)
416 {
417  struct m0_rpc_conn *conn = fom_conn(fom);
418  int result = M0_FSO_AGAIN;
419  int rc;
420 
421  m0_fom_phase_set(fom, next_phase);
422 
423  /* ab is probably received from network, don't assert on its data. */
424  if (ab->ab_type == M0_RPC_AT_BULK_SEND) {
425  rc = rpc_at_bulk_init(ab, conn) ?:
426  rpc_at_bulk_srecv(ab, fom);
427  if (rc == 0)
428  result = M0_FSO_WAIT;
429  }
430  return result;
431 }
432 
433 M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
434 {
436  ab->ab_type = M0_RPC_AT_EMPTY;
437  ab->u.ab_extra.abr_user_buf = M0_BUF_INIT0;
438  ab->u.ab_extra.abr_bulk = NULL;
439 }
440 
441 M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
442 {
443  switch (ab->ab_type) {
444  case M0_RPC_AT_INLINE:
445  m0_buf_free(&ab->u.ab_buf);
446  break;
447  case M0_RPC_AT_BULK_SEND:
448  m0_net_desc_free(&ab->u.ab_send.bdd_desc);
449  break;
450  case M0_RPC_AT_BULK_RECV:
451  m0_net_desc_free(&ab->u.ab_recv.bdd_desc);
452  break;
453  }
454 
455  if (m0_buf_is_set(&ab->u.ab_extra.abr_user_buf))
456  m0_buf_free(&ab->u.ab_extra.abr_user_buf);
457  if (rpc_at_bulk(ab) != NULL)
459  ab->ab_type = M0_RPC_AT_EMPTY;
460 }
461 
462 M0_INTERNAL int m0_rpc_at_add(struct m0_rpc_at_buf *ab,
463  const struct m0_buf *buf,
464  const struct m0_rpc_conn *conn)
465 {
466  m0_bcount_t blen = buf->b_nob;
467  int rc = 0;
468 
469  M0_ENTRY();
470  M0_PRE(ab != NULL);
471  M0_PRE(buf != NULL);
472  M0_PRE(conn != NULL);
473 
474  /*
475  * If buffer is too big to fit in FOP, but inbulk is impossible due to
476  * non-aligned address, then error will be returned during FOP
477  * transmission.
478  */
479  if (blen < rpc_at_bulk_cutoff(conn) ||
480  !m0_addr_is_aligned(buf->b_addr, m0_pageshift_get())) {
482  ab->u.ab_buf = *buf;
483  } else {
484  rc = rpc_at_bulk_init(ab, conn) ?:
485  rpc_at_bulk_csend(ab, buf);
486  if (rc == 0)
487  ab->u.ab_extra.abr_user_buf = *buf;
488  }
489  return M0_RC(rc);
490 }
491 
492 M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
493 {
494  switch (ab->ab_type) {
495  case M0_RPC_AT_EMPTY:
496  case M0_RPC_AT_BULK_RECV:
497  return false;
498  case M0_RPC_AT_INLINE:
499  case M0_RPC_AT_BULK_SEND:
500  return true;
501  case M0_RPC_AT_BULK_REP:
502  return m0_rpc_at_len(ab) > 0;
503  default:
504  M0_IMPOSSIBLE("Incorrect AT type");
505  }
506 }
507 
508 M0_INTERNAL int m0_rpc_at_recv(struct m0_rpc_at_buf *ab,
509  const struct m0_rpc_conn *conn,
510  uint32_t len,
511  bool force_bulk)
512 {
513  int rc = 0;
514 
515  M0_PRE(!m0_rpc_at_is_set(ab));
516  M0_PRE(conn != NULL);
517 
518  if ((len == M0_RPC_AT_UNKNOWN_LEN ||
519  len < rpc_at_bulk_cutoff(conn)) &&
520  !force_bulk)
521  ab->ab_type = M0_RPC_AT_EMPTY;
522  else
523  rc = rpc_at_bulk_init(ab, conn) ?:
524  rpc_at_bulk_crecv(ab, len);
525  return M0_RC(rc);
526 }
527 
528 M0_INTERNAL int m0_rpc_at_reply(struct m0_rpc_at_buf *in,
529  struct m0_rpc_at_buf *out,
530  struct m0_buf *repbuf,
531  struct m0_fom *fom,
532  int next_phase)
533 {
534  const struct m0_rpc_conn *conn = fom_conn(fom);
535  m0_bcount_t blen = repbuf->b_nob;
536  bool use_bulk = false;
537  int result = M0_FSO_AGAIN;
538  int rc = 0;
539 
541 
542  m0_fom_phase_set(fom, next_phase);
543 
544  if (blen < rpc_at_bulk_cutoff(conn)) {
545  if (in != NULL && in->ab_type == M0_RPC_AT_BULK_RECV) {
546  use_bulk = true;
547  } else if (in == NULL || in->ab_type == M0_RPC_AT_EMPTY) {
548  out->ab_type = M0_RPC_AT_INLINE;
549  out->u.ab_buf = *repbuf;
550  } else {
551  rc = M0_ERR(-EPROTO);
552  }
553  } else {
554  use_bulk = true;
555  }
556 
557  if (use_bulk) {
558  out->ab_type = M0_RPC_AT_BULK_REP;
559  if (!m0_buf_is_set(repbuf))
560  rc = M0_ERR(-ENODATA);
561  if (rc == 0) {
562  if (in != NULL && in->ab_type == M0_RPC_AT_BULK_RECV &&
563  in->u.ab_recv.bdd_used >= repbuf->b_nob) {
565  if (rc == 0) {
566  rpc_at_bulk_ssend(in, out, repbuf, fom);
567  if (rc != 0)
569  }
570  } else {
571  rc = -ENOMSG; /* Not really a error. */
572  }
573  }
574  out->u.ab_rep.abr_rc = rc;
575  out->u.ab_rep.abr_len = repbuf->b_nob;
576  result = rc == 0 ? M0_FSO_WAIT : M0_FSO_AGAIN;
577  }
578  if (rc != 0)
579  m0_buf_free(repbuf);
580  return result;
581 }
582 
583 M0_INTERNAL int m0_rpc_at_reply_rc(struct m0_rpc_at_buf *out)
584 {
585  int rc = 0;
586 
587  M0_ASSERT(M0_IN(out->ab_type, (M0_RPC_AT_EMPTY,
590  /*
591  * AT bulk structure is already deallocated or wasn't ever allocated.
592  * User calling m0_rpc_at_reply() shouldn't call m0_rpc_at_fini()
593  * afterwards, so AT bulk structure should be de-allocated to not cause
594  * memory leaks.
595  */
597 
598  if (out->ab_type == M0_RPC_AT_EMPTY)
599  return M0_ERR(-EPROTO);
600 
601  if (out->ab_type == M0_RPC_AT_BULK_REP)
602  rc = out->u.ab_rep.abr_rc;
603  return M0_RC(rc);
604 }
605 
606 M0_INTERNAL int m0_rpc_at_rep_get(struct m0_rpc_at_buf *sent,
607  struct m0_rpc_at_buf *rcvd,
608  struct m0_buf *out)
609 {
610  struct rpc_at_bulk *atbulk;
611  struct m0_bufvec *bvec;
612  struct m0_buf *recv;
613  int rc = 0;
614 
615  M0_PRE(sent == NULL || M0_IN(sent->ab_type,
617 
618  if (!M0_IN(rcvd->ab_type, (M0_RPC_AT_EMPTY, M0_RPC_AT_INLINE,
620  return M0_ERR_INFO(-EPROTO, "Incorrect AT type rcvd %u",
621  rcvd->ab_type);
622 
623  if (rcvd->ab_type == M0_RPC_AT_EMPTY) {
624  *out = M0_BUF_INIT0;
625  } else if (rcvd->ab_type == M0_RPC_AT_INLINE) {
626  *out = rcvd->u.ab_buf;
627  } else {
628  rc = rcvd->u.ab_rep.abr_rc;
629  if (rc == 0) {
630  if (sent != NULL &&
631  sent->ab_type == M0_RPC_AT_BULK_RECV) {
632  atbulk = rpc_at_bulk(sent);
633  bvec = &atbulk->ac_nb.nb_buffer;
634  recv = &atbulk->ac_recv;
635  if (!m0_buf_is_set(recv))
637  atbulk->ac_nb.nb_length,
638  recv);
639  if (rc == 0)
640  *out = *recv;
641  } else {
642  return M0_ERR_INFO(-EPROTO,
643  "AT type mismatch rcvd %u",
644  rcvd->ab_type);
645  }
646  }
647  }
648  return M0_RC(rc);
649 }
650 
651 M0_INTERNAL bool m0_rpc_at_rep_is_bulk(const struct m0_rpc_at_buf *rcvd,
652  uint64_t *len)
653 {
654  if (rcvd->ab_type == M0_RPC_AT_BULK_REP) {
655  *len = rcvd->u.ab_rep.abr_len;
656  return true;
657  } else {
658  *len = M0_RPC_AT_UNKNOWN_LEN;
659  return false;
660  }
661 }
662 
663 M0_INTERNAL int m0_rpc_at_rep2inline(struct m0_rpc_at_buf *sent,
664  struct m0_rpc_at_buf *rcvd)
665 {
666  struct rpc_at_bulk *atbulk;
667  const struct m0_bufvec *bvec;
668  int rc;
669 
670  if (rcvd->ab_type == M0_RPC_AT_INLINE) {
671  rc = 0;
672  } else if (rcvd->ab_type == M0_RPC_AT_BULK_REP) {
673  rc = rcvd->u.ab_rep.abr_rc;
674  if (rc == 0) {
675  if (sent->ab_type == M0_RPC_AT_BULK_RECV) {
676  atbulk = rpc_at_bulk(sent);
677  bvec = &atbulk->ac_nb.nb_buffer;
679  atbulk->ac_nb.nb_length,
680  &rcvd->u.ab_buf);
681  if (rc == 0)
682  rcvd->ab_type = M0_RPC_AT_INLINE;
683  } else {
684  rc = M0_ERR(-EPROTO);
685  }
686  }
687  } else {
688  rc = -EPROTO;
689  }
690 
691  return M0_RC(rc);
692 }
693 
694 M0_INTERNAL void m0_rpc_at_detach(struct m0_rpc_at_buf *ab)
695 {
696  struct rpc_at_bulk *atbulk;
697 
698  switch (ab->ab_type) {
699  case M0_RPC_AT_INLINE:
700  ab->u.ab_buf = M0_BUF_INIT0;
701  break;
702  case M0_RPC_AT_BULK_RECV:
703  atbulk = rpc_at_bulk(ab);
704  atbulk->ac_recv = M0_BUF_INIT0;
705  }
706  ab->u.ab_extra.abr_user_buf = M0_BUF_INIT0;
707 }
708 
709 M0_INTERNAL uint64_t m0_rpc_at_len(const struct m0_rpc_at_buf *ab)
710 {
711  uint64_t ret;
712 
713  switch (ab->ab_type) {
714  case M0_RPC_AT_EMPTY:
715  ret = 0;
716  break;
717  case M0_RPC_AT_INLINE:
718  ret = ab->u.ab_buf.b_nob;
719  break;
720  case M0_RPC_AT_BULK_SEND:
721  ret = ab->u.ab_send.bdd_used;
722  break;
723  case M0_RPC_AT_BULK_RECV:
724  ret = ab->u.ab_recv.bdd_used;
725  break;
726  case M0_RPC_AT_BULK_REP:
727  ret = ab->u.ab_rep.abr_len;
728  break;
729  default:
730  M0_IMPOSSIBLE("Incorrect AT buf type");
731  }
732  return ret;
733 }
734 
735 #undef M0_TRACE_SUBSYSTEM
736 
739 /*
740  * Local variables:
741  * c-indentation-style: "K&R"
742  * c-basic-offset: 8
743  * tab-width: 8
744  * fill-column: 80
745  * scroll-step: 1
746  * End:
747  */
748 /*
749  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
750  */
static m0_bcount_t seg_size
Definition: net.c:118
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
static int rpc_at_bulk_csend(struct m0_rpc_at_buf *ab, const struct m0_buf *buf)
Definition: at.c:202
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
M0_INTERNAL bool m0_buf_is_set(const struct m0_buf *buf)
Definition: buf.c:127
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct m0_fom * ac_user_fom
Definition: at.c:65
struct m0_net_buffer ac_nb
Definition: at.c:54
#define NULL
Definition: misc.h:38
static int rpc_at_bulk_srecv_rc(const struct m0_rpc_at_buf *ab, struct m0_buf *buf)
Definition: at.c:271
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
static void rpc_at_bulk_fini(struct rpc_at_bulk *atbulk)
Definition: at.c:183
M0_INTERNAL int m0_rpc_bulk_store(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *to_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:520
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
static bool m0_addr_is_aligned(const void *addr, unsigned shift)
Definition: memory.h:107
struct m0_bufvec nb_buffer
Definition: net.h:1322
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL int m0_bufvec_splice(const struct m0_bufvec *bvec, m0_bcount_t nr, struct m0_buf *buf)
Definition: vec.c:491
M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf, void *buf, m0_bcount_t count, m0_bindex_t index, struct m0_net_domain *netdom)
Definition: bulk.c:331
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
#define min_check(a, b)
Definition: arith.h:88
struct m0_sm_group fl_group
Definition: fom.h:274
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
Definition: at.c:492
struct m0_chan rb_chan
Definition: bulk.h:258
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
struct m0_sm_ast ac_ast
Definition: at.c:70
m0_bcount_t nb_length
Definition: net.h:1334
static void rpc_at_bulk_store_del(struct m0_rpc_bulk *rbulk)
Definition: at.c:153
struct m0_net_domain * ntm_dom
Definition: net.h:853
struct m0_rpc_at_buf * ac_atbuf
Definition: at.c:51
M0_INTERNAL int m0_rpc_at_add(struct m0_rpc_at_buf *ab, const struct m0_buf *buf, const struct m0_rpc_conn *conn)
Definition: at.c:462
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL int m0_pageshift_get(void)
Definition: memory.c:238
Definition: sm.h:504
static int void * buf
Definition: dir.c:1019
M0_INTERNAL int m0_rpc_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: at.c:414
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
Definition: at.c:49
static int struct dentry int struct nameidata * nd
Definition: dir.c:593
Definition: sock.c:887
M0_INTERNAL int m0_rpc_at_reply(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *repbuf, struct m0_fom *fom, int next_phase)
Definition: at.c:528
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
M0_INTERNAL int m0_rpc_at_rep_get(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd, struct m0_buf *out)
Definition: at.c:606
int m0_bufvec_alloc_aligned(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size, unsigned shift)
Definition: vec.c:355
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
int i
Definition: dir.c:1033
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
m0_bcount_t rm_bulk_cutoff
Definition: rpc_machine.h:157
M0_INTERNAL bool m0_rpc_at_rep_is_bulk(const struct m0_rpc_at_buf *rcvd, uint64_t *len)
Definition: at.c:651
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
struct m0_net_buffer * bb_nbuf
Definition: bulk.h:177
static bool rpc_at_ssend_complete_cb(struct m0_clink *clink)
Definition: at.c:330
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
m0_bcount_t b_nob
Definition: buf.h:38
int32_t rb_rc
Definition: bulk.h:266
#define M0_ASSERT(cond)
struct m0_clink ac_clink
Definition: at.c:60
static uint64_t rpc_at_bulk_segs_nr(const struct rpc_at_bulk *atbulk, m0_bcount_t data_size, m0_bcount_t *seg_size)
Definition: at.c:95
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
static struct m0_bufvec bvec
Definition: xcode.c:169
#define M0_BUF_INIT0
Definition: buf.h:71
static void rpc_at_ssend_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: at.c:315
M0_INTERNAL int m0_rpc_at_get(const struct m0_rpc_at_buf *ab, struct m0_buf *buf)
Definition: at.c:399
M0_INTERNAL int m0_rpc_at_reply_rc(struct m0_rpc_at_buf *out)
Definition: at.c:583
static int rpc_at_bulk_nb_alloc(struct m0_rpc_at_buf *ab, uint64_t size)
Definition: at.c:108
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
union m0_rpc_at_buf::@447 u
Definition: dump.c:103
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
struct m0_rpc_conn conn
Definition: fsync.c:96
static struct m0_clink clink[RDWR_REQUEST_MAX]
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
static uint64_t data_size(const struct m0_pdclust_layout *play)
Definition: file.c:550
static int rpc_at_bulk_ssend(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *buf, struct m0_fom *fom)
Definition: at.c:343
static int rpc_at_bulk_srecv(struct m0_rpc_at_buf *ab, struct m0_fom *fom)
Definition: at.c:235
M0_INTERNAL void m0_bufvec_free_aligned(struct m0_bufvec *bufvec, unsigned shift)
Definition: vec.c:436
struct m0_buf abr_user_buf
Definition: at.h:246
const struct m0_rpc_conn * ac_conn
Definition: at.c:53
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
static struct rpc_at_bulk * rpc_at_bulk(const struct m0_rpc_at_buf *ab)
Definition: at.c:75
Definition: fom.h:481
static int rpc_at_bulk_crecv(struct m0_rpc_at_buf *ab, uint32_t len)
Definition: at.c:288
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)
struct m0_fom_locality * fo_loc
Definition: fom.h:483
m0_net_queue_type
Definition: net.h:591
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL uint64_t m0_rpc_at_len(const struct m0_rpc_at_buf *ab)
Definition: at.c:709
struct m0_rpc_bulk ac_bulk
Definition: at.c:52
m0_bcount_t size
Definition: di.c:39
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:539
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
static struct m0_rpc_conn * fom_conn(const struct m0_fom *fom)
Definition: at.c:90
struct m0_tl rb_buflist
Definition: bulk.h:256
static int rpc_at_bulk_init(struct m0_rpc_at_buf *ab, const struct m0_rpc_conn *conn)
Definition: at.c:132
static struct m0_net_domain * rpc_at_bulk_ndom(const struct rpc_at_bulk *atbulk)
Definition: at.c:85
#define out(...)
Definition: gen.c:41
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:215
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
static enum m0_net_queue_type rpc_at_bulk_qtype(struct m0_rpc_bulk *rbulk)
Definition: at.c:165
int ac_rc
Definition: at.c:72
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
M0_INTERNAL int m0_rpc_at_rep2inline(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd)
Definition: at.c:663
static m0_bcount_t rpc_at_bulk_cutoff(const struct m0_rpc_conn *conn)
Definition: at.c:80
struct m0_buf ac_recv
Definition: at.c:56
void m0_free(void *data)
Definition: memory.c:146
int32_t rc
Definition: trigger_fop.h:47
struct m0_mutex rb_mutex
Definition: bulk.h:251
Definition: vec.h:145
M0_INTERNAL void m0_rpc_at_detach(struct m0_rpc_at_buf *ab)
Definition: at.c:694
M0_INTERNAL int m0_rpc_at_recv(struct m0_rpc_at_buf *ab, const struct m0_rpc_conn *conn, uint32_t len, bool force_bulk)
Definition: at.c:508
uint32_t ab_type
Definition: at.h:251
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
#define M0_IMPOSSIBLE(fmt,...)
static void rpc_at_bulk_nb_free(struct rpc_at_bulk *atbulk)
Definition: at.c:127
struct rpc_at_bulk * abr_bulk
Definition: at.h:245