Motr  M0
frmops.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 #include "lib/memory.h"
26 #include "lib/misc.h"
27 #include "lib/errno.h"
28 #include "lib/arith.h"
29 #include "lib/bob.h"
30 #include "lib/finject.h"
31 
32 #include "motr/magic.h"
33 #include "net/net.h"
34 #include "rpc/rpc_internal.h"
35 #include "rpc/service.h"
36 
42 static int packet_ready(struct m0_rpc_packet *p);
43 
44 static int net_buffer_allocate(struct m0_net_buffer *netbuf,
45  struct m0_net_domain *ndom,
47 
48 static void net_buffer_free(struct m0_net_buffer *netbuf,
49  struct m0_net_domain *ndom);
50 
51 static void bufvec_geometry(struct m0_net_domain *ndom,
53  int32_t *out_nr_segments,
54  m0_bcount_t *out_segment_size);
55 
56 static void item_done(struct m0_rpc_packet *p, struct m0_rpc_item *item,
57  int rc);
58 static void item_fail(struct m0_rpc_packet *p, struct m0_rpc_item *item,
59  int rc);
60 static void item_sent(struct m0_rpc_item *item);
61 
62 /*
63  * This is the only symbol exported from this file.
64  */
67 };
68 
69 enum { M0_RPC_TMO = 8 };
70 
83 struct rpc_buffer {
87  uint64_t rb_magic;
88 };
89 
90 static const struct m0_bob_type rpc_buffer_bob_type = {
91  .bt_name = "rpc_buffer",
92  .bt_magix_offset = M0_MAGIX_OFFSET(struct rpc_buffer, rb_magic),
93  .bt_magix = M0_RPC_BUF_MAGIC,
94  .bt_check = NULL,
95 };
96 
98 
99 static int rpc_buffer_init(struct rpc_buffer *rpcbuf,
100  struct m0_rpc_packet *p);
101 
102 static int rpc_buffer_submit(struct rpc_buffer *rpcbuf);
103 
104 static void rpc_buffer_fini(struct rpc_buffer *rpcbuf);
105 
106 static void buf_send_cb(const struct m0_net_buffer_event *ev);
107 
109  .nbc_cb = {
111  }
112 };
113 
114 static struct m0_rpc_machine *
115 rpc_buffer__rmachine(const struct rpc_buffer *rpcbuf)
116 {
117  struct m0_rpc_machine *rmachine;
118 
119  M0_PRE(rpc_buffer_bob_check(rpcbuf) &&
120  rpcbuf->rb_packet != NULL &&
121  rpcbuf->rb_packet->rp_frm != NULL);
122 
124  M0_ASSERT(rmachine != NULL);
125 
126  return rmachine;
127 }
128 
135 static int packet_ready(struct m0_rpc_packet *p)
136 {
137  struct rpc_buffer *rpcbuf;
138  int rc;
139 
140  M0_ENTRY("packet: %p", p);
142 
143  M0_ALLOC_PTR(rpcbuf);
144  if (rpcbuf == NULL) {
145  rc = M0_ERR(-ENOMEM);
146  M0_LOG(M0_ERROR, "Failed to allocate rpcbuf");
147  goto err;
148  }
149  rc = rpc_buffer_init(rpcbuf, p);
150  if (rc != 0)
151  goto err_free;
152 
153  if (M0_FI_ENABLED("set_reply_error")) {
154  struct m0_rpc_item *item;
155 
157  if (m0_rpc_item_is_reply(item)) {
158  rc = -ENETDOWN;
159  M0_LOG(M0_ERROR, "packet %p, item %p[%"PRIu32"]"
160  " set error to %d", p, item,
162  goto out;
163  }
165  }
166 
167  rc = rpc_buffer_submit(rpcbuf);
168  if (rc == 0)
169  return M0_RC(rc);
170 out:
171  rpc_buffer_fini(rpcbuf);
172 err_free:
173  m0_free(rpcbuf);
174 err:
177 
178  return M0_RC(rc);
179 }
180 
185 static int rpc_buffer_init(struct rpc_buffer *rpcbuf,
186  struct m0_rpc_packet *p)
187 {
188  struct m0_net_buffer *netbuf;
189  struct m0_net_domain *ndom;
190  struct m0_rpc_machine *machine;
191  struct m0_rpc_chan *rchan;
192  int rc;
193 
194  M0_ENTRY("rbuf: %p packet: %p", rpcbuf, p);
195  M0_PRE(rpcbuf != NULL && p != NULL);
196 
197  machine = frm_rmachine(p->rp_frm);
198  ndom = machine->rm_tm.ntm_dom;
199  M0_ASSERT(ndom != NULL);
200 
201  netbuf = &rpcbuf->rb_netbuf;
202  rc = net_buffer_allocate(netbuf, ndom, p->rp_size);
203  if (rc != 0)
204  goto out;
205 
206  rc = m0_rpc_packet_encode(p, &netbuf->nb_buffer);
207  if (rc != 0) {
208  net_buffer_free(netbuf, ndom);
209  goto out;
210  }
211  rchan = frm_rchan(p->rp_frm);
212  netbuf->nb_length = m0_vec_count(&netbuf->nb_buffer.ov_vec);
213  netbuf->nb_ep = rchan->rc_destep;
214 
215  rpcbuf->rb_packet = p;
216  rpc_buffer_bob_init(rpcbuf);
217 
218 out:
219  return M0_RC(rc);
220 }
221 
225 static int net_buffer_allocate(struct m0_net_buffer *netbuf,
226  struct m0_net_domain *ndom,
228 {
229  m0_bcount_t segment_size;
230  int32_t nr_segments;
231  int rc;
232 
233  M0_ENTRY("netbuf: %p ndom: %p bufsize: %llu", netbuf, ndom,
234  (unsigned long long)buf_size);
235  M0_PRE(netbuf != NULL && ndom != NULL && buf_size > 0);
236 
237  bufvec_geometry(ndom, buf_size, &nr_segments, &segment_size);
238 
239  M0_SET0(netbuf);
240  rc = m0_bufvec_alloc_aligned(&netbuf->nb_buffer, nr_segments,
241  segment_size, M0_SEG_SHIFT);
242  if (rc != 0) {
243  M0_LOG(M0_ERROR, "buffer allocation failed");
244  goto out;
245  }
246 
247  rc = m0_net_buffer_register(netbuf, ndom);
248  if (rc != 0) {
249  M0_LOG(M0_ERROR, "net buf registeration failed");
250  m0_bufvec_free_aligned(&netbuf->nb_buffer, M0_SEG_SHIFT);
251  }
252 out:
253  return M0_RC(rc);
254 }
255 
261 static void bufvec_geometry(struct m0_net_domain *ndom,
263  int32_t *out_nr_segments,
264  m0_bcount_t *out_segment_size)
265 {
266  m0_bcount_t max_buf_size;
267  m0_bcount_t max_segment_size;
268  m0_bcount_t segment_size;
269  int32_t max_nr_segments;
270  int32_t nr_segments;
271 
272  M0_ENTRY();
273 
274  max_buf_size = m0_rpc_max_msg_size(ndom, 0);
275  max_segment_size = m0_rpc_max_seg_size(ndom);
276  max_nr_segments = m0_rpc_max_segs_nr(ndom);
277 
279  "max_buf_size: %llu max_segment_size: %llu max_nr_seg: %d",
280  (unsigned long long)max_buf_size,
281  (unsigned long long)max_segment_size, max_nr_segments);
282 
283  M0_ASSERT(buf_size <= max_buf_size);
284 
285  /* encoding routine requires buf_size to be 8 byte aligned */
286  buf_size = m0_align(buf_size, 8);
287  M0_LOG(M0_DEBUG, "bufsize: 0x%llx", (unsigned long long)buf_size);
288 
289  if (buf_size <= max_segment_size) {
290  segment_size = buf_size;
291  nr_segments = 1;
292  } else {
293  segment_size = max_segment_size;
294 
295  nr_segments = buf_size / max_segment_size;
296  if (buf_size % max_segment_size != 0)
297  ++nr_segments;
298  }
299 
300  *out_segment_size = segment_size;
301  *out_nr_segments = nr_segments;
302 
303  M0_LEAVE("seg_size: %llu nr_segments: %d",
304  (unsigned long long)*out_segment_size, *out_nr_segments);
305 }
306 
307 static void net_buffer_free(struct m0_net_buffer *netbuf,
308  struct m0_net_domain *ndom)
309 {
310  M0_ENTRY("netbuf: %p ndom: %p", netbuf, ndom);
311  M0_PRE(netbuf != NULL && ndom != NULL);
312 
313  m0_net_buffer_deregister(netbuf, ndom);
314  m0_bufvec_free_aligned(&netbuf->nb_buffer, M0_SEG_SHIFT);
315 
316  M0_LEAVE();
317 }
318 
322 static int rpc_buffer_submit(struct rpc_buffer *rpcbuf)
323 {
324  struct m0_net_buffer *netbuf;
325  struct m0_rpc_machine *machine;
326  int rc;
327 
328  M0_ENTRY("rpcbuf: %p", rpcbuf);
329  M0_PRE(rpc_buffer_bob_check(rpcbuf));
330 
331  netbuf = &rpcbuf->rb_netbuf;
332 
333  netbuf->nb_qtype = M0_NET_QT_MSG_SEND;
334  netbuf->nb_callbacks = &rpc_buf_send_cb;
335 
336  machine = rpc_buffer__rmachine(rpcbuf);
337  netbuf->nb_timeout = m0_time_from_now(M0_RPC_TMO, 0);
338  rc = m0_net_buffer_add(netbuf, &machine->rm_tm);
339  if (rc == 0) {
341  M0_LOG(M0_DEBUG,"+%p->rm_active_nb: %" PRIi64 " %p\n",
342  machine, machine->rm_active_nb, rpcbuf);
343  }
344 
345  return M0_RC(rc);
346 }
347 
348 static void rpc_buffer_fini(struct rpc_buffer *rpcbuf)
349 {
350  struct m0_net_domain *ndom;
351  struct m0_rpc_machine *machine;
352 
353  M0_ENTRY("rpcbuf: %p", rpcbuf);
354  M0_PRE(rpc_buffer_bob_check(rpcbuf));
355 
356  machine = rpc_buffer__rmachine(rpcbuf);
357  ndom = machine->rm_tm.ntm_dom;
358  M0_ASSERT(ndom != NULL);
359 
360  net_buffer_free(&rpcbuf->rb_netbuf, ndom);
361  rpc_buffer_bob_fini(rpcbuf);
362 
363  M0_LEAVE();
364 }
365 
370 static void buf_send_cb(const struct m0_net_buffer_event *ev)
371 {
372  struct m0_net_buffer *netbuf;
373  struct rpc_buffer *rpcbuf;
374  struct m0_rpc_machine *machine;
375  struct m0_rpc_stats *stats;
376  struct m0_rpc_packet *p;
377 
378  M0_ENTRY("ev: %p", ev);
379  M0_PRE(ev != NULL);
380 
381  netbuf = ev->nbe_buffer;
382  M0_ASSERT(netbuf != NULL &&
383  netbuf->nb_qtype == M0_NET_QT_MSG_SEND &&
384  (netbuf->nb_flags & M0_NET_BUF_QUEUED) == 0);
385 
386  rpcbuf = bob_of(netbuf, struct rpc_buffer, rb_netbuf,
388 
389  machine = rpc_buffer__rmachine(rpcbuf);
390 
391  if (M0_FI_ENABLED("delay_callback"))
392  m0_nanosleep(m0_time(0, 300000000), NULL); /* 300 msec */
393 
395 
396  stats = &machine->rm_stats;
397  p = rpcbuf->rb_packet;
398  p->rp_status = ev->nbe_status;
399 
400  if (M0_FI_ENABLED("fake_err"))
401  p->rp_status = -EINVAL;
402  rpc_buffer_fini(rpcbuf);
403  m0_free(rpcbuf);
404 
405  if (p->rp_status == 0) {
406  stats->rs_nr_sent_packets++;
407  stats->rs_nr_sent_bytes += p->rp_size;
408  } else {
409  stats->rs_nr_failed_packets++;
410  }
412  M0_LOG(M0_DEBUG, "-%p->rm_active_nb: %" PRIi64 " %p\n",
413  machine, machine->rm_active_nb, rpcbuf);
414  if (machine->rm_active_nb == 0)
416  /*
417  * At this point, rpc subsystem is normally having 4 refs on item/fop:
418  * - m0_rpc__post_locked()
419  * - m0_rpc_item_send()
420  * - item pending cache
421  * - for the formation queue or package (depends on item state)
422  *
423  * Two more refs can be taken in the following cases:
424  * - sync rpc_post (normally used in UT)
425  * - fop_alloc takes first ref but the caller may release it right
426  * after sending to rpc subsystem
427  *
428  * Exceptions:
429  * - session0 fops (such as session termination) normally have 3 refs,
430  * as they don't add items to pending cache
431  * - reply item may have 4 refs but they are taken other places
432  */
436 
438  M0_LEAVE();
439 }
440 
441 static void item_done(struct m0_rpc_packet *p, struct m0_rpc_item *item, int rc)
442 {
443  M0_PRE(item != NULL);
444  M0_ENTRY("item=%p[%"PRIu32"] ri_error=%"PRIi32" rc=%d",
446 
447  if (item->ri_pending_reply != NULL) {
448  /* item that is never sent, i.e. item->ri_nr_sent == 0,
449  can never have a (pending/any) reply.
450  */
451  M0_ASSERT(ergo(rc != 0, item->ri_nr_sent > 0));
452  rc = 0;
453  item->ri_error = 0;
454  }
455 
456  /*
457  * Item timeout by sending deadline is also counted as sending error
458  * and the ref, released in processing reply, is released here.
459  */
460  /*
461  * If rc is -ECONNREFUSED, that means the remote service has not yet
462  * started. In such cases, resend the item after the resend interval
463  * instead of marking the item as failed and flooding the network with
464  * connection requests.
465  */
466  item->ri_error = item->ri_error ?: rc == -ECONNREFUSED ? 0 : rc;
467  if (item->ri_error != 0) {
468  /*
469  * Normally this put() would call at m0_rpc_item_process_reply(),
470  * but there won't be any replies for non-oneway items of this
471  * packet already.
472  */
475  }
476  if (item->ri_error != 0 &&
478  M0_LOG(M0_ERROR, " item: %p dest_ep=%s ri_session=%p ri_nr_sent_max=%"PRIu64
479  "ri_deadline=%" PRIu64 " ri_nr_sent=%u", item,
480  item->ri_session == NULL ? "NOT AVAILABLE" :
483 
484  M0_LOG(M0_ERROR, "packet %p, item %p[%"PRIu32"] failed with"
485  " ri_error=%"PRIi32, p, item, item->ri_type->rit_opcode,
486  item->ri_error);
488  } else
489  item_sent(item);
490 
491  M0_LEAVE();
492 }
493 
494 static void item_sent(struct m0_rpc_item *item)
495 {
496  struct m0_rpc_stats *stats;
497 
498  M0_ENTRY("%p[%s/%"PRIu32"], sent=%u max=%lx"
499  " item->ri_sm.sm_state %"PRIu32" ri_error=%"PRIi32,
501  item->ri_nr_sent, (unsigned long)item->ri_nr_sent_max,
503 
505  /*
506  * Request might have been cancelled while in SENDING state
507  * and before reaching M0_RPC_ITEM_SENT state.
508  */
510  M0_ASSERT(item->ri_error == -ECANCELED);
512  return;
513  }
514 
516  M0_IN(item->ri_error, (0, -ETIMEDOUT))) &&
518 
520 
522  stats->rs_nr_sent_items++;
523 
524  if (item->ri_nr_sent == 1) { /* not resent */
525  stats->rs_nr_sent_items_uniq++;
527  } else if (item->ri_nr_sent == 2) {
528  /* item with ri_nr_sent >= 2 are counted as 1 in
529  rs_nr_resent_items i.e. rs_nr_resent_items counts number
530  of items that required resending.
531  */
532  stats->rs_nr_resent_items++;
533  }
534 
535  /*
536  * Reference release done here is for the reference taken in
537  * m0_rpc_item_send() and also for one-way items corresponding
538  * reference taken in m0_rpc_oneway_item_post_locked().
539  */
541 
542  /*
543  * Request and Reply items take hold on session until
544  * they are SENT/FAILED.
545  * See: m0_rpc__post_locked(), m0_rpc_reply_post()
546  * m0_rpc_item_send()
547  */
550 
553  if (item->ri_pending_reply != NULL) {
554  /* Reply has already been received when we
555  were waiting for buffer callback */
559  }
560  }
561 
562  M0_LEAVE();
563 }
564 
565 static void item_fail(struct m0_rpc_packet *p, struct m0_rpc_item *item, int rc)
566 {
567  /* This is only called from packet_ready() error handling code path. */
568  M0_PRE(item != NULL);
569  M0_ENTRY("item=%p[%"PRIu32"] ri_error=%"PRIi32" rc=%d",
571 
572  item->ri_error = rc;
573  if (item->ri_error != 0) {
574  M0_LOG(M0_ERROR, "packet %p, item %p[%"PRIu32"] failed with"
575  " ri_error=%"PRIi32, p, item, item->ri_type->rit_opcode,
576  item->ri_error);
579  }
580 
581  M0_LEAVE();
582 }
583 
584 #undef M0_TRACE_SUBSYSTEM
585 
M0_INTERNAL uint32_t m0_rpc_max_segs_nr(struct m0_net_domain *ndom)
Definition: rpc.c:293
uint32_t rit_opcode
Definition: item.h:474
static struct m0_addb2_philter p
Definition: consumer.c:40
#define M0_PRE(cond)
struct m0_chan rm_nb_idle
Definition: rpc_machine.h:141
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:728
#define NULL
Definition: misc.h:38
static struct m0_rpc_chan rchan
Definition: formation2.c:37
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL void m0_rpc_frm_packet_done(struct m0_rpc_packet *p)
Definition: formation2.c:698
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_rpc_packet_traverse_items(struct m0_rpc_packet *p, item_visit_fn *visit, int opaque_data)
Definition: packet.c:524
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
Definition: item.c:742
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_sm ri_sm
Definition: item.h:181
uint64_t rb_magic
Definition: frmops.c:87
static void item_done(struct m0_rpc_packet *p, struct m0_rpc_item *item, int rc)
Definition: frmops.c:441
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
int32_t ri_error
Definition: item.h:161
struct m0_rpc_item * ri_pending_reply
Definition: item.h:187
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
Definition: item.c:356
#define M0_SET0(obj)
Definition: misc.h:64
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL struct m0_rpc_chan * frm_rchan(const struct m0_rpc_frm *frm)
Definition: formation2.c:156
const char * bt_name
Definition: bob.h:73
static void item_sent(struct m0_rpc_item *item)
Definition: frmops.c:494
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
M0_INTERNAL bool m0_rpc_packet_invariant(const struct m0_rpc_packet *p)
Definition: packet.c:82
int m0_bufvec_alloc_aligned(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size, unsigned shift)
Definition: vec.c:355
return M0_RC(rc)
static struct m0_trace_stats stats
Definition: ktrace.c:69
#define M0_ENTRY(...)
Definition: trace.h:170
#define PRIu64
Definition: types.h:58
int32_t nbe_status
Definition: net.h:1218
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
M0_INTERNAL void m0_rpc_item_process_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1464
M0_INTERNAL struct m0_rpc_machine * frm_rmachine(const struct m0_rpc_frm *frm)
Definition: formation2.c:161
static struct m0_rpc_machine * rpc_buffer__rmachine(const struct rpc_buffer *rpcbuf)
Definition: frmops.c:115
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
M0_INTERNAL m0_bcount_t m0_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: rpc.c:302
static struct m0_rpc_machine rmachine
Definition: formation2.c:36
static const struct m0_net_buffer_callbacks rpc_buf_send_cb
Definition: frmops.c:108
static int rpc_buffer_submit(struct rpc_buffer *rpcbuf)
Definition: frmops.c:322
static int packet_ready(struct m0_rpc_packet *p)
Definition: frmops.c:135
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_net_buffer rb_netbuf
Definition: frmops.c:84
static void net_buffer_free(struct m0_net_buffer *netbuf, struct m0_net_domain *ndom)
Definition: frmops.c:307
struct m0_net_end_point * rc_destep
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
const struct m0_net_buffer_callbacks * nb_callbacks
Definition: net.h:1369
M0_INTERNAL int m0_rpc_packet_encode(struct m0_rpc_packet *p, struct m0_bufvec *bufvec)
Definition: packet.c:221
#define PRIi32
Definition: types.h:67
uint64_t ri_nr_sent_max
Definition: item.h:146
int(* fo_packet_ready)(struct m0_rpc_packet *p)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
struct m0_rpc_machine machine
Definition: mdstore.c:58
M0_INTERNAL void m0_bufvec_free_aligned(struct m0_bufvec *bufvec, unsigned shift)
Definition: vec.c:436
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
static void buf_send_cb(const struct m0_net_buffer_event *ev)
Definition: frmops.c:370
M0_INTERNAL bool m0_rpc_item_is_reply(const struct m0_rpc_item *item)
Definition: item.c:516
static int rpc_buffer_init(struct rpc_buffer *rpcbuf, struct m0_rpc_packet *p)
Definition: frmops.c:185
#define PRIu32
Definition: types.h:66
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
#define M0_CNT_INC(cnt)
Definition: arith.h:226
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_rpc_item_sent_invoke(struct m0_rpc_item *item)
Definition: item.c:1766
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
struct m0_rpc_packet * rb_packet
Definition: frmops.c:85
static const struct m0_bob_type rpc_buffer_bob_type
Definition: frmops.c:90
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
Definition: beck.c:130
#define PRIi64
Definition: types.h:59
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
struct m0_rpc_session * ri_session
Definition: item.h:147
static void item_fail(struct m0_rpc_packet *p, struct m0_rpc_item *item, int rc)
Definition: frmops.c:565
M0_INTERNAL void m0_rpc_packet_discard(struct m0_rpc_packet *packet)
Definition: packet.c:134
struct m0_rpc_frm * rp_frm
#define for_each_item_in_packet(item, packet)
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
uint64_t rm_active_nb
Definition: rpc_machine.h:139
static uint32_t buf_size
Definition: ad.c:75
#define out(...)
Definition: gen.c:41
struct m0_rpc_stats rm_stats
Definition: rpc_machine.h:96
uint32_t ri_nr_sent
Definition: item.h:183
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
M0_BOB_DEFINE(static, &rpc_buffer_bob_type, rpc_buffer)
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL m0_bcount_t m0_rpc_max_seg_size(struct m0_net_domain *ndom)
Definition: rpc.c:284
uint32_t sm_state
Definition: sm.h:307
static void rpc_buffer_fini(struct rpc_buffer *rpcbuf)
Definition: frmops.c:348
static int net_buffer_allocate(struct m0_net_buffer *netbuf, struct m0_net_domain *ndom, m0_bcount_t buf_size)
Definition: frmops.c:225
M0_INTERNAL void m0_rpc_session_release(struct m0_rpc_session *session)
Definition: session.c:791
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
static void bufvec_geometry(struct m0_net_domain *ndom, m0_bcount_t buf_size, int32_t *out_nr_segments, m0_bcount_t *out_segment_size)
Definition: frmops.c:261
static uint64_t m0_align(uint64_t val, uint64_t alignment)
Definition: arith.h:170
const struct m0_rpc_frm_ops m0_rpc_frm_default_ops
Definition: frmops.c:65
#define end_for_each_item_in_packet
struct m0_net_end_point * nb_ep
Definition: net.h:1424
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
Definition: item.c:1188
m0_time_t ri_deadline
Definition: item.h:141
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)
Definition: ktime.c:73