Motr  M0
formation2.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FORMATION
24 #include "lib/trace.h"
25 #include "lib/misc.h" /* M0_SET0 */
26 #include "lib/memory.h"
27 #include "lib/tlist.h"
28 #include "motr/magic.h"
29 #include "lib/finject.h" /* M0_FI_ENABLED */
30 #include "reqh/reqh.h"
31 
32 #include "rpc/rpc_internal.h"
33 
38 static bool itemq_invariant(const struct m0_tl *q);
39 static m0_bcount_t itemq_nr_bytes_acc(const struct m0_tl *q);
40 
41 static enum m0_rpc_frm_itemq_type
42 frm_which_qtype(struct m0_rpc_frm *frm, const struct m0_rpc_item *item);
43 static bool frm_is_idle(const struct m0_rpc_frm *frm);
44 static void frm_insert(struct m0_rpc_frm *frm, struct m0_rpc_item *item);
45 static void frm_remove(struct m0_rpc_frm *frm, struct m0_rpc_item *item);
46 static void __itemq_insert(struct m0_tl *q, struct m0_rpc_item *new_item);
47 static void __itemq_remove(struct m0_rpc_item *item);
48 static void frm_balance(struct m0_rpc_frm *frm);
49 static bool frm_is_ready(const struct m0_rpc_frm *frm);
50 static void frm_fill_packet(struct m0_rpc_frm *frm, struct m0_rpc_packet *p);
52  struct m0_rpc_packet *p);
53 static int frm_packet_ready(struct m0_rpc_frm *frm, struct m0_rpc_packet *p);
54 static void frm_try_merging_item(struct m0_rpc_frm *frm,
55  struct m0_rpc_item *item,
56  m0_bcount_t limit);
57 
58 static bool item_less_or_equal(const struct m0_rpc_item *i0,
59  const struct m0_rpc_item *i1);
60 static void item_move_to_urgent_queue(struct m0_rpc_frm *frm,
61  struct m0_rpc_item *item);
62 
64  const struct m0_rpc_frm *frm);
66  const struct m0_rpc_packet *p,
67  const struct m0_rpc_frm *frm);
68 
69 static bool item_supports_merging(const struct m0_rpc_item *item);
70 
71 static void drop_all_items(struct m0_rpc_frm *frm);
72 
73 static bool
75 
76 static const char *str_qtype[] = {
77  [FRMQ_URGENT] = "URGENT",
78  [FRMQ_WAITING] = "WAITING",
79 };
80 
82 
83 #define frm_first_itemq(frm) (&(frm)->f_itemq[0])
84 #define frm_end_itemq(frm) (&(frm)->f_itemq[ARRAY_SIZE((frm)->f_itemq)])
85 
86 #define for_each_itemq_in_frm(itemq, frm) \
87 for (itemq = frm_first_itemq(frm); \
88  itemq < frm_end_itemq(frm); \
89  ++itemq)
90 
91 M0_TL_DESCR_DEFINE(itemq, "rpc_itemq", M0_INTERNAL, struct m0_rpc_item,
92  ri_iq_link, ri_magic, M0_RPC_ITEM_MAGIC,
94 M0_TL_DEFINE(itemq, M0_INTERNAL, struct m0_rpc_item);
95 
96 static bool frm_invariant(const struct m0_rpc_frm *frm)
97 {
98  m0_bcount_t nr_bytes_acc = 0;
99  uint64_t nr_items = 0;
100 
101  return frm != NULL &&
105  frm->f_ops != NULL &&
108  ({
109  const struct m0_tl *q = &frm->f_itemq[i];
110 
111  nr_items += itemq_tlist_length(q);
112  nr_bytes_acc += itemq_nr_bytes_acc(q);
113  itemq_invariant(q); })) &&
114  frm->f_nr_items == nr_items &&
115  frm->f_nr_bytes_accumulated == nr_bytes_acc;
116 }
117 
118 static bool itemq_invariant(const struct m0_tl *q)
119 {
120  return q != NULL &&
121  m0_tl_forall(itemq, item, q, ({
122  const struct m0_rpc_item *prev =
123  itemq_tlist_prev(q, item);
124  ergo(prev != NULL,
125  item_less_or_equal(prev, item));
126  }));
127 }
128 
132 static bool item_less_or_equal(const struct m0_rpc_item *i0,
133  const struct m0_rpc_item *i1)
134 {
135  return i0->ri_prio > i1->ri_prio ||
136  (i0->ri_prio == i1->ri_prio &&
137  i0->ri_deadline <= i1->ri_deadline);
138 }
139 
143 static m0_bcount_t itemq_nr_bytes_acc(const struct m0_tl *q)
144 {
145  struct m0_rpc_item *item;
147 
148  size = 0;
149  m0_tl_for(itemq, q, item)
151  m0_tl_endfor;
152 
153  return size;
154 }
155 
156 M0_INTERNAL struct m0_rpc_chan *frm_rchan(const struct m0_rpc_frm *frm)
157 {
158  return container_of(frm, struct m0_rpc_chan, rc_frm);
159 }
160 
161 M0_INTERNAL struct m0_rpc_machine *frm_rmachine(const struct m0_rpc_frm *frm)
162 {
163  return frm_rchan(frm)->rc_rpc_machine;
164 }
165 
166 static bool frm_rmachine_is_locked(const struct m0_rpc_frm *frm)
167 {
169 }
170 
171 M0_INTERNAL void m0_rpc_frm_constraints_get_defaults(struct
173 {
174  M0_ENTRY();
175 
177  c->fc_max_nr_packets_enqed = 100;
178  c->fc_max_nr_segments = 128;
179  c->fc_max_packet_size = 4096;
180  c->fc_max_nr_bytes_accumulated = 4096;
181 
182  M0_LEAVE();
183 }
184 
185 static bool
187 {
189  return constraints != NULL;
190 }
191 
192 static bool frm_is_idle(const struct m0_rpc_frm *frm)
193 {
194  return frm->f_nr_items == 0 && frm->f_nr_packets_enqed == 0;
195 }
196 
197 M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm,
199  const struct m0_rpc_frm_ops *ops)
200 {
201  struct m0_tl *q;
202 
203  M0_ENTRY("frm: %p", frm);
204  M0_PRE(frm != NULL &&
205  ops != NULL &&
207 
208  M0_SET0(frm);
209  frm->f_ops = ops;
210  frm->f_constraints = *constraints; /* structure instance copy */
212 
214  itemq_tlist_init(q);
215 
216  frm->f_state = FRM_IDLE;
217 
219  M0_LEAVE();
220 }
221 
222 M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
223 {
224  struct m0_tl *q;
225 
226  M0_ENTRY("frm: %p", frm);
228  M0_LOG(M0_DEBUG, "frm state: %d", frm->f_state);
229 
233  itemq_tlist_fini(q);
234 
236  frm->f_magic = 0;
237 
238  M0_LEAVE();
239 }
240 
241 static void drop_all_items(struct m0_rpc_frm *frm)
242 {
243  struct m0_rpc_item *item;
244  struct m0_tl *q;
245  int i;
246 
247  for (i = 0; i < FRMQ_NR_QUEUES; i++) {
248  q = &frm->f_itemq[i];
249  m0_tl_for(itemq, q, item) {
252  frm_remove(frm, item);
253  m0_rpc_item_failed(item, -ECANCELED);
255  } m0_tl_endfor;
256  M0_ASSERT(itemq_tlist_is_empty(q));
257  }
258 }
259 
260 M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm,
261  struct m0_rpc_item *item)
262 {
263  M0_ENTRY("frm: %p item: %p", frm, item);
266 
267  frm_insert(frm, item);
268  frm_balance(frm);
269 
270  M0_LEAVE();
271 }
272 
273 static void frm_insert(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
274 {
275  enum m0_rpc_frm_itemq_type qtype;
276  struct m0_tl *q;
277  int rc;
278 
279  M0_ENTRY("frm: %p item: %p size %zu opcode %lu xid %lu",
280  frm, item, item->ri_size,
281  (unsigned long)item->ri_type->rit_opcode,
282  (unsigned long)item->ri_header.osr_xid);
283  M0_PRE(item != NULL && !itemq_tlink_is_in(item));
284  M0_LOG(M0_DEBUG, "priority: %d", item->ri_prio);
285 
286  qtype = frm_which_qtype(frm, item);
287  q = &frm->f_itemq[qtype];
288 
291 
294  item->ri_frm = frm;
295  if (frm->f_state == FRM_IDLE)
296  frm->f_state = FRM_BUSY;
297 
298 up:
301  M0_LOG(M0_DEBUG, "%p Starting deadline timer", item);
303  /* For resent item, we may need to "re-arm"
304  ri_deadline_timeout.
305  */
310  item->ri_deadline,
311  M0_RPC_ITEM_URGENT, 0);
312  if (rc != 0) {
313  M0_LOG(M0_NOTICE, "%p failed to start deadline timer",
314  item);
315  item->ri_deadline = 0;
317  goto up;
318  }
319  } else {
321  }
322  M0_LEAVE("nr_items: %llu bytes: %llu",
323  (unsigned long long)frm->f_nr_items,
324  (unsigned long long)frm->f_nr_bytes_accumulated);
325 }
326 
327 M0_INTERNAL bool item_is_in_waiting_queue(const struct m0_rpc_item *item,
328  const struct m0_rpc_frm *frm)
329 {
330  return item->ri_itemq == &frm->f_itemq[FRMQ_WAITING];
331 }
332 
337 static enum m0_rpc_frm_itemq_type
339 {
340  enum m0_rpc_frm_itemq_type qtype;
341  bool deadline_passed;
342 
343  M0_ENTRY("item: %p", item);
344  M0_PRE(item != NULL);
345 
346  deadline_passed = m0_time_now() >= item->ri_deadline;
347 
349  "deadline: "TIME_F" deadline_passed: %s",
351  m0_bool_to_str(deadline_passed));
352 
353  qtype = deadline_passed ? FRMQ_URGENT : FRMQ_WAITING;
354  M0_LEAVE("qtype: %s", str_qtype[qtype]);
355  return qtype;
356 }
357 
363 static void __itemq_insert(struct m0_tl *q, struct m0_rpc_item *new_item)
364 {
365  struct m0_rpc_item *item;
366 
367  M0_ENTRY();
368 
369  /* insertion sort. */
370  m0_tl_for(itemq, q, item) {
372  itemq_tlist_add_before(item, new_item);
373  break;
374  }
375  } m0_tl_endfor;
376  if (item == NULL)
377  itemq_tlist_add_tail(q, new_item);
378  new_item->ri_itemq = q;
379 
381  M0_LEAVE();
382 }
383 
385  struct m0_rpc_item *item)
386 {
387  M0_ENTRY("frm: %p item: %p", frm, item);
388 
390  frm_balance(frm);
391 
392  M0_LEAVE();
393 }
394 
396  struct m0_rpc_item *item)
397 {
398  M0_PRE(item != NULL);
399 
402 }
403 
409 static void frm_balance(struct m0_rpc_frm *frm)
410 {
411  struct m0_rpc_packet *p;
412  int packet_count;
413  int item_count;
414  int rc;
415 
416  M0_ENTRY("frm: %p", frm);
417 
420 
421  M0_LOG(M0_DEBUG, "ready: %s",
422  (char *)m0_bool_to_str(frm_is_ready(frm)));
423  packet_count = item_count = 0;
424 
425  if (M0_FI_ENABLED("do_nothing"))
426  return;
427 
428  while (frm_is_ready(frm)) {
429  M0_ALLOC_PTR(p);
430  if (p == NULL) {
431  M0_LOG(M0_ERROR, "Error: packet allocation failed");
432  break;
433  }
436  if (m0_rpc_packet_is_empty(p)) {
437  /* See FRM_BALANCE_NOTE_1 at the end of this function */
439  m0_free(p);
440  break;
441  }
442  ++packet_count;
443  item_count += p->rp_ow.poh_nr_items;
444  rc = frm_packet_ready(frm, p);
445  if (rc == 0) {
447  /*
448  * f_nr_packets_enqed will be decremented in packet
449  * done callback, see m0_rpc_frm_packet_done()
450  */
451  if (frm->f_state == FRM_IDLE)
452  frm->f_state = FRM_BUSY;
453  }
454  }
455 
457  M0_LEAVE("formed %d packet(s) [%d items]", packet_count, item_count);
458 }
459 /*
460  * FRM_BALANCE_NOTE_1
461  * This case can arise if:
462  * - Accumulated bytes are >= max_nr_bytes_accumulated,
463  * hence frm is READY
464  */
465 
472 static bool frm_is_ready(const struct m0_rpc_frm *frm)
473 {
474  const struct m0_rpc_frm_constraints *c;
475  bool has_urgent_items;
476 
477  M0_PRE(frm != NULL);
478 
479  if (M0_FI_ENABLED("ready"))
480  return true;
481  has_urgent_items =
482  !itemq_tlist_is_empty(&frm->f_itemq[FRMQ_URGENT]);
483 
484  c = &frm->f_constraints;
485  return frm->f_nr_packets_enqed < c->fc_max_nr_packets_enqed &&
486  (has_urgent_items ||
487  frm->f_nr_bytes_accumulated >= c->fc_max_nr_bytes_accumulated);
488 }
489 
495 static void frm_fill_packet(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
496 {
497  struct m0_rpc_item *item;
498  struct m0_tl *q;
499  m0_bcount_t limit;
500 
501  M0_ENTRY("frm: %p packet: %p", frm, p);
502 
504 
506  m0_tl_for(itemq, q, item) {
507  /* See FRM_FILL_PACKET_NOTE_1 at the end of this func */
508  if (available_space_in_packet(p, frm) == 0)
509  goto out;
511  continue;
513  /*
514  * Request might have been cancelled while in
515  * URGENT state
516  */
518  frm_remove(frm, item);
519  continue;
520  }
521  if (M0_FI_ENABLED("skip_oneway_items") &&
523  continue;
525  frm_remove(frm, item);
527  limit = available_space_in_packet(p, frm);
528  frm_try_merging_item(frm, item, limit);
529  }
534  } m0_tl_endfor;
535  }
537 out:
539  M0_LEAVE();
540 }
541 /*
542  * FRM_FILL_PACKET_NOTE_1
543  * I know that this loop is inefficient. But for now
544  * let's just stick to simplicity. We can optimize it
545  * later if need arises. --Amit
546  */
547 
549  const struct m0_rpc_frm *frm)
550 {
552  return frm->f_constraints.fc_max_packet_size - p->rp_size;
553 }
554 
556  const struct m0_rpc_packet *p,
557  const struct m0_rpc_frm *frm)
558 {
560 }
561 
562 static bool item_supports_merging(const struct m0_rpc_item *item)
563 {
564  M0_PRE(item->ri_type != NULL &&
565  item->ri_type->rit_ops != NULL);
566 
567  return item->ri_type->rit_ops->rito_try_merge != NULL;
568 }
569 
571  struct m0_rpc_packet *p)
572 {
574  struct m0_rpc_conn *conn;
575  struct m0_rpc_item_source *source;
576  struct m0_rpc_item *item;
577  m0_bcount_t available_space;
578  m0_bcount_t header_footer_size;
579 
580  M0_ENTRY();
581 
582  header_footer_size = m0_rpc_item_onwire_header_size +
584  m0_tl_for(rpc_conn, &machine->rm_outgoing_conns, conn) {
585  if (&conn->c_rpcchan->rc_frm != frm ||
587  continue;
588  M0_LOG(M0_DEBUG, "conn: %p", conn);
589  m0_tl_for(item_source, &conn->c_item_sources, source) {
590  M0_LOG(M0_DEBUG, "source: %p", source);
591  while (source->ris_ops->riso_has_item(source)) {
592  available_space = available_space_in_packet(p,
593  frm);
594  if (available_space <= header_footer_size)
595  goto out;
596  item = source->ris_ops->riso_get_item(source,
597  available_space - header_footer_size);
598  if (item == NULL)
599  break; /* next item source */
601  /*
602  * Rpc always acquires an *internal* reference
603  * to "all" items (Here sourced one-way items).
604  * This reference is released when the item is
605  * sent.
606  */
609  item->ri_nr_sent++;
611  M0_LOG(M0_DEBUG, "item: %p", item);
613  p, frm));
618  }
619  } m0_tl_endfor;
620  } m0_tl_endfor;
621 
622 out:
623  M0_LEAVE();
624 }
625 
626 M0_INTERNAL void m0_rpc_frm_remove_item(struct m0_rpc_frm *frm,
627  struct m0_rpc_item *item)
628 {
629  frm_remove(frm, item);
630 }
631 
632 static void frm_remove(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
633 {
634  M0_ENTRY("frm: %p item: %p", frm, item);
635  M0_PRE(frm != NULL && item != NULL);
636  M0_PRE(frm->f_nr_items > 0 && item->ri_itemq != NULL);
637 
639  item->ri_frm = NULL;
643 
644  if (frm_is_idle(frm))
645  frm->f_state = FRM_IDLE;
647  M0_LEAVE();
648 }
649 
650 static void __itemq_remove(struct m0_rpc_item *item)
651 {
652  itemq_tlink_del_fini(item);
653  item->ri_itemq = NULL;
654 }
655 
656 static void frm_try_merging_item(struct m0_rpc_frm *frm,
657  struct m0_rpc_item *item,
658  m0_bcount_t limit)
659 {
660  M0_ENTRY("frm: %p item: %p limit: %llu", frm, item,
661  (unsigned long long)limit);
663  M0_LEAVE();
664  return;
665 }
666 
670 static int frm_packet_ready(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
671 {
672  M0_ENTRY("frm: %p packet %p", frm, p);
673 
674  M0_PRE(frm != NULL && p != NULL && !m0_rpc_packet_is_empty(p));
676  M0_LOG(M0_DEBUG, "nr_items: %llu",
677  (unsigned long long)p->rp_ow.poh_nr_items);
678 
679  p->rp_frm = frm;
680  /* See packet_ready() in rpc/frmops.c */
681  return M0_RC(frm->f_ops->fo_packet_ready(p));
682 }
683 
684 M0_INTERNAL void m0_rpc_frm_run_formation(struct m0_rpc_frm *frm)
685 {
686  if (M0_FI_ENABLED("do_nothing"))
687  return;
688 
689  M0_ENTRY("frm: %p", frm);
692 
693  frm_balance(frm);
694 
695  M0_LEAVE();
696 }
697 
698 M0_INTERNAL void m0_rpc_frm_packet_done(struct m0_rpc_packet *p)
699 {
700  struct m0_rpc_frm *frm;
701 
702  M0_ENTRY("packet: %p", p);
704 
705  frm = p->rp_frm;
708 
710  M0_LOG(M0_DEBUG, "nr_packets_enqed: %llu",
711  (unsigned long long)frm->f_nr_packets_enqed);
712 
713  if (frm_is_idle(frm))
714  frm->f_state = FRM_IDLE;
715 
716  frm_balance(frm);
717 
718  M0_LEAVE();
719 }
720 
721 M0_INTERNAL struct m0_rpc_frm *session_frm(const struct m0_rpc_session *s)
722 {
723  return &s->s_conn->c_rpcchan->rc_frm;
724 }
725 
726 #undef M0_TRACE_SUBSYSTEM
727 
static void frm_remove(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:632
static bool item_less_or_equal(const struct m0_rpc_item *i0, const struct m0_rpc_item *i1)
Definition: formation2.c:132
M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
Definition: formation2.c:222
uint32_t rit_opcode
Definition: item.h:474
static struct m0_addb2_philter p
Definition: consumer.c:40
#define M0_PRE(cond)
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:728
struct m0_rpc_frm_constraints f_constraints
static struct m0_semaphore q
Definition: rwlock.c:55
static void frm_insert(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:273
#define NULL
Definition: misc.h:38
static void frm_try_merging_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item, m0_bcount_t limit)
Definition: formation2.c:656
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL void m0_rpc_frm_packet_done(struct m0_rpc_packet *p)
Definition: formation2.c:698
size_t ri_size
Definition: item.h:198
const struct m0_rpc_item_type_ops * rit_ops
Definition: item.h:476
M0_INTERNAL void m0_sm_timeout_init(struct m0_sm_timeout *to)
Definition: sm.c:667
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static m0_bcount_t available_space_in_packet(const struct m0_rpc_packet *p, const struct m0_rpc_frm *frm)
Definition: formation2.c:548
M0_TL_DEFINE(itemq, M0_INTERNAL, struct m0_rpc_item)
struct m0_tl rm_outgoing_conns
Definition: rpc_machine.h:95
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
Definition: item.c:742
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
struct m0_sm ri_sm
Definition: item.h:181
static bool frm_is_ready(const struct m0_rpc_frm *frm)
Definition: formation2.c:472
M0_INTERNAL struct m0_rpc_frm * session_frm(const struct m0_rpc_session *s)
Definition: formation2.c:721
static struct m0_rpc_frm_constraints constraints
Definition: formation2.c:35
static void __itemq_remove(struct m0_rpc_item *item)
Definition: formation2.c:650
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL void m0_rpc_packet_add_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
Definition: packet.c:141
static void frm_fill_packet(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
Definition: formation2.c:495
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
struct m0_sm_timeout ri_deadline_timeout
Definition: item.h:169
#define TIME_P(t)
Definition: time.h:45
M0_BASSERT(ARRAY_SIZE(str_qtype)==FRMQ_NR_QUEUES)
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_tl c_item_sources
Definition: conn.h:314
M0_INTERNAL struct m0_rpc_chan * frm_rchan(const struct m0_rpc_frm *frm)
Definition: formation2.c:156
struct m0_rpc_chan * c_rpcchan
Definition: conn.h:317
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_footer_size
Definition: item.c:105
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
M0_INTERNAL bool m0_rpc_packet_invariant(const struct m0_rpc_packet *p)
Definition: packet.c:82
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
Definition: item.c:104
return M0_RC(rc)
#define equi(a, b)
Definition: misc.h:297
#define M0_ASSERT_EX(cond)
#define M0_ENTRY(...)
Definition: trace.h:170
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
Definition: item.c:470
uint64_t osr_xid
Definition: onwire.h:105
static void item_move_to_urgent_queue(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:395
static bool frm_rmachine_is_locked(const struct m0_rpc_frm *frm)
Definition: formation2.c:166
M0_INTERNAL void m0_rpc_packet_init(struct m0_rpc_packet *p, struct m0_rpc_machine *rmach)
Definition: packet.c:103
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
#define TIME_F
Definition: time.h:44
int i
Definition: dir.c:1033
M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm, struct m0_rpc_frm_constraints *constraints, const struct m0_rpc_frm_ops *ops)
Definition: formation2.c:197
M0_INTERNAL void m0_rpc_packet_fini(struct m0_rpc_packet *p)
Definition: packet.c:122
static void frm_fill_packet_from_item_sources(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
Definition: formation2.c:570
static void drop_all_items(struct m0_rpc_frm *frm)
Definition: formation2.c:241
m0_bcount_t f_nr_bytes_accumulated
static bool item_will_exceed_packet_size(struct m0_rpc_item *item, const struct m0_rpc_packet *p, const struct m0_rpc_frm *frm)
Definition: formation2.c:555
M0_INTERNAL struct m0_rpc_machine * frm_rmachine(const struct m0_rpc_frm *frm)
Definition: formation2.c:161
#define for_each_itemq_in_frm(itemq, frm)
Definition: formation2.c:86
M0_INTERNAL void m0_rpc_frm_item_deadline_passed(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:384
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
Definition: item.c:704
#define M0_ASSERT(cond)
M0_INTERNAL const char * m0_bool_to_str(bool b)
Definition: misc.c:207
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
m0_time_t m0_time_now(void)
Definition: time.c:134
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
static struct m0_addb2_callback c
Definition: consumer.c:41
Definition: tlist.h:251
static bool frm_invariant(const struct m0_rpc_frm *frm)
Definition: formation2.c:96
struct m0_rpc_frm * ri_frm
Definition: item.h:226
static struct m0_rpc_item * new_item(int deadline, int kind)
Definition: formation2.c:202
bool(* rito_try_merge)(struct m0_rpc_item *container, struct m0_rpc_item *component, m0_bcount_t limit)
Definition: item.h:415
struct m0_rpc_machine * rc_rpc_machine
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
M0_INTERNAL void m0_rpc_frm_remove_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:626
M0_TL_DESCR_DEFINE(itemq, "rpc_itemq", M0_INTERNAL, struct m0_rpc_item, ri_iq_link, ri_magic, M0_RPC_ITEM_MAGIC, M0_RPC_ITEMQ_HEAD_MAGIC)
struct m0_rpc_frm rc_frm
struct m0_tl f_itemq[FRMQ_NR_QUEUES]
static enum m0_rpc_frm_itemq_type frm_which_qtype(struct m0_rpc_frm *frm, const struct m0_rpc_item *item)
Definition: formation2.c:338
int(* fo_packet_ready)(struct m0_rpc_packet *p)
M0_INTERNAL void m0_rpc_frm_run_formation(struct m0_rpc_frm *frm)
Definition: formation2.c:684
m0_rpc_frm_itemq_type
enum frm_state f_state
struct m0_rpc_conn conn
Definition: fsync.c:96
M0_INTERNAL void m0_rpc_frm_constraints_get_defaults(struct m0_rpc_frm_constraints *c)
Definition: formation2.c:171
struct m0_rpc_machine machine
Definition: mdstore.c:58
uint64_t f_nr_packets_enqed
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:260
#define m0_forall(var, nr,...)
Definition: misc.h:112
M0_INTERNAL int m0_sm_timeout_arm(struct m0_sm *mach, struct m0_sm_timeout *to, m0_time_t timeout, int state, uint64_t bitmask)
Definition: sm.c:674
static struct m0_rpc_frm * frm
Definition: formation2.c:34
static void frm_balance(struct m0_rpc_frm *frm)
Definition: formation2.c:409
M0_INTERNAL void m0_sm_timeout_fini(struct m0_sm_timeout *to)
Definition: sm.c:705
#define M0_CNT_INC(cnt)
Definition: arith.h:226
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
static bool item_supports_merging(const struct m0_rpc_item *item)
Definition: formation2.c:562
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static int frm_packet_ready(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
Definition: formation2.c:670
struct m0_tl * ri_itemq
Definition: item.h:225
static const char * str_qtype[]
Definition: formation2.c:76
m0_bcount_t size
Definition: di.c:39
M0_INTERNAL bool item_is_in_waiting_queue(const struct m0_rpc_item *item, const struct m0_rpc_frm *frm)
Definition: formation2.c:327
const struct m0_rpc_frm_ops * f_ops
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
static void __itemq_insert(struct m0_tl *q, struct m0_rpc_item *new_item)
Definition: formation2.c:363
#define out(...)
Definition: gen.c:41
static bool constraints_are_valid(const struct m0_rpc_frm_constraints *constraints)
Definition: formation2.c:186
uint32_t ri_nr_sent
Definition: item.h:183
struct m0_fom_ops ops
Definition: io_foms.c:623
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
M0_INTERNAL bool m0_sm_timeout_is_armed(const struct m0_sm_timeout *to)
Definition: sm.c:713
#define M0_PRE_EX(cond)
M0_INTERNAL bool m0_rpc_packet_is_empty(const struct m0_rpc_packet *p)
Definition: packet.c:214
Definition: net.c:93
static bool frm_is_idle(const struct m0_rpc_frm *frm)
Definition: formation2.c:192
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
static m0_bcount_t itemq_nr_bytes_acc(const struct m0_tl *q)
Definition: formation2.c:143
uint32_t sm_state
Definition: sm.h:307
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define M0_POST_EX(cond)
static int conn_state(const struct m0_rpc_conn *conn)
Definition: conn_internal.h:66
static bool itemq_invariant(const struct m0_tl *q)
Definition: formation2.c:118
m0_time_t ri_deadline
Definition: item.h:141
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735