Motr  M0
formation2.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "ut/ut.h"
24 #include "lib/mutex.h"
25 #include "lib/time.h"
26 #include "lib/memory.h"
27 #include "lib/misc.h"
28 #include "lib/finject.h"
29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
30 #include "lib/trace.h"
31 #include "sm/sm.h"
32 #include "rpc/rpc_internal.h"
33 
34 static struct m0_rpc_frm *frm;
36 static struct m0_rpc_machine rmachine;
37 static struct m0_rpc_chan rchan;
38 static struct m0_rpc_session session;
41 
42 extern const struct m0_sm_conf outgoing_item_sm_conf;
43 extern const struct m0_sm_conf incoming_item_sm_conf;
44 
45 static int frm_ut_init(void)
46 {
47  int rc;
48 
49  frm = NULL;
51  M0_SET0(&rmachine);
52  M0_SET0(&rchan);
53  M0_SET0(&session);
54 
60  frm = &rchan.rc_frm;
61  rpc_conn_tlist_init(&rmachine.rm_outgoing_conns);
63  rmachine.rm_stopping = false;
66  "rpc_worker");
67  M0_ASSERT(rc == 0);
69  return 0;
70 }
71 
72 static int frm_ut_fini(void)
73 {
74  rmachine.rm_stopping = true;
79  rpc_conn_tlist_fini(&rmachine.rm_outgoing_conns);
80  return 0;
81 }
82 
83 enum { STACK_SIZE = 100 };
84 
86 static int top = 0;
87 
88 static void packet_stack_push(struct m0_rpc_packet *p)
89 {
90  M0_UT_ASSERT(p != NULL);
92  packet_stack[top] = p;
93  ++top;
94 }
95 
96 static struct m0_rpc_packet *packet_stack_pop(void)
97 {
98  M0_UT_ASSERT(top > 0 && top < STACK_SIZE);
99  --top;
100  return packet_stack[top];
101 }
102 
103 static bool packet_stack_is_empty(void)
104 {
105  return top == 0;
106 }
107 
109 static int item_bind_count;
110 
111 static void flags_reset(void)
112 {
113  packet_ready_called = false;
114  item_bind_count = 0;
115 }
116 
117 static int packet_ready(struct m0_rpc_packet *p)
118 {
119  M0_UT_ASSERT(frm_rmachine(p->rp_frm) == &rmachine);
120  M0_UT_ASSERT(frm_rchan(p->rp_frm) == &rchan);
122  packet_ready_called = true;
123  return 0;
124 }
125 
126 static struct m0_rpc_frm_ops frm_ops = {
128 };
129 
130 static void frm_init_test(void)
131 {
135 }
136 
138 {
139  return 10;
140 }
141 
143  struct m0_rpc_item *component,
144  m0_bcount_t limit)
145 {
146  return false;
147 }
148 
149 static void item_get_noop(struct m0_rpc_item *item)
150 {
151  /* Do nothing */
152 }
153 
154 static void item_put_noop(struct m0_rpc_item *item)
155 {
156  /* Do nothing */
157 }
158 
161  .rito_try_merge = twoway_item_try_merge,
162  .rito_item_get = item_get_noop,
163  .rito_item_put = item_put_noop,
164 };
165 
166 static struct m0_rpc_item_type twoway_item_type = {
168  .rit_ops = &twoway_item_type_ops,
169 };
170 
172 {
173  return 20;
174 }
175 
178  .rito_item_get = item_get_noop,
179  .rito_item_put = item_put_noop,
180 };
181 
182 static struct m0_rpc_item_type oneway_item_type = {
184  .rit_ops = &oneway_item_type_ops,
185 };
186 
187 enum {
188  TIMEDOUT = 1,
189  WAITING = 2,
190  NEVER = 3,
191 
192  NORMAL = 1,
193  ONEWAY = 2,
194 };
195 
196 static m0_time_t timeout; /* nano seconds */
197 static void set_timeout(uint64_t milli)
198 {
199  timeout = milli * 1000 * 1000;
200 }
201 
202 static struct m0_rpc_item *new_item(int deadline, int kind)
203 {
204  struct m0_rpc_item_type *itype;
205  struct m0_rpc_item *item;
206  static struct m0_sm_conf sm_conf;
207 
208  M0_UT_ASSERT(M0_IN(deadline, (TIMEDOUT, WAITING, NEVER)));
209  M0_UT_ASSERT(M0_IN(kind, (NORMAL, ONEWAY)));
210 
212  M0_UT_ASSERT(item != NULL);
213 
214  itype = kind == ONEWAY ? &oneway_item_type : &twoway_item_type;
215  m0_rpc_item_init(item, itype);
217 
219  sm_conf = *item->ri_sm.sm_conf;
221  item->ri_sm.sm_conf = &sm_conf;
222 
223  switch (deadline) {
224  case TIMEDOUT:
225  item->ri_deadline = 0;
226  break;
227  case NEVER:
229  break;
230  case WAITING:
232  break;
233  }
235  item->ri_session = &session;
236 
237  return item;
238 }
239 
240 static void
241 check_frm(enum frm_state state, uint64_t nr_items, uint64_t nr_packets)
242 {
243  M0_UT_ASSERT(frm->f_state == state &&
244  frm->f_nr_items == nr_items &&
245  frm->f_nr_packets_enqed == nr_packets);
246 }
247 
249 {
250  struct m0_rpc_packet *p;
251 
252  p = packet_stack_pop();
255  check_frm(FRM_BUSY, 0, 1);
258  check_frm(FRM_IDLE, 0, 0);
259 }
260 
261 static void perform_test(int deadline, int kind)
262 {
263  struct m0_rpc_item *item;
264 
265  set_timeout(100);
266  item = new_item(deadline, kind);
267  flags_reset();
269  if (deadline == WAITING) {
270  int result;
271 
273  check_frm(FRM_BUSY, 1, 0);
274  /* Allow RPC worker to process timeout AST */
276  /*
277  * The original code slept for 2*timeout here. This is
278  * unreliable and led to spurious assertion failures
279  * below. Explicitly wait until the item enters an
280  * appropriate state.
281  */
282  result = m0_rpc_item_timedwait(item,
288  M0_TIME_NEVER);
289  M0_UT_ASSERT(result == 0);
291  }
295  m0_free(item);
296 }
297 
298 static void frm_test1(void)
299 {
300  struct m0_rpc_item *item;
301  /*
302  * Timedout item triggers immediate formation.
303  * Waiting item do not trigger immediate formation, but they are
304  * formed once deadline is passed.
305  */
306  M0_ENTRY();
307 
308  /* Do not let formation trigger because of size limit */
310 
315 
316  /* Test: item is moved to URGENT state when call to m0_sm_timeout_arm()
317  fails to start item->ri_deadline_timeout in frm_insert().
318  */
319  set_timeout(100);
321  flags_reset();
322  m0_fi_enable_once("m0_sm_timeout_arm", "failed");
326  m0_free(item);
327 
328  M0_LEAVE();
329 }
330 
331 static void perform_test2(int kind)
332 {
333  enum { N = 4 };
334  struct m0_rpc_item *items[N];
335  struct m0_rpc_packet *p;
336  int i;
337  m0_bcount_t item_size;
338 
339  for (i = 0; i < N; ++i)
340  items[i] = new_item(WAITING, kind);
341  item_size = m0_rpc_item_size(items[0]);
342  /* include all ready items */
344  /*
345  * set fc_max_nr_bytes_accumulated such that, formation triggers
346  * when last item from items[] is enqued
347  */
349  (N - 1) * item_size + item_size / 2;
350 
351  flags_reset();
352  for (i = 0; i < N - 1; ++i) {
353  m0_rpc_frm_enq_item(frm, items[i]);
355  check_frm(FRM_BUSY, i + 1, 0);
356  }
357  m0_rpc_frm_enq_item(frm, items[N - 1]);
359  check_frm(FRM_BUSY, 0, 1);
360 
361  p = packet_stack_pop();
363  for (i = 0; i < N; ++i)
365 
367  check_frm(FRM_IDLE, 0, 0);
368 
370  for (i = 0; i < N; ++i) {
371  m0_rpc_item_fini(items[i]);
372  m0_free(items[i]);
373  }
374 }
375 
376 static void frm_test2(void)
377 {
378  /* formation triggers when accumulated bytes exceed limit */
379 
380  M0_ENTRY();
381 
382  set_timeout(999);
383 
386 
387  M0_LEAVE();
388 }
389 
390 static void frm_test3(void)
391 {
392  /* If max_nr_packets_enqed is reached, formation is stopped */
393  struct m0_rpc_item *item;
394  uint64_t saved;
395 
396  M0_ENTRY();
397 
398 
402  flags_reset();
405  check_frm(FRM_BUSY, 1, 0);
406 
410 
413  m0_free(item);
414 
415  M0_LEAVE();
416 }
417 
418 static void frm_do_test5(const int N, const int ITEMS_PER_PACKET)
419 {
420  /* Multiple packets are formed if ready items don't fit in one packet */
421  struct m0_rpc_item *items[N];
422  struct m0_rpc_packet *p;
423  m0_bcount_t saved_max_nr_bytes_acc;
424  m0_bcount_t saved_max_packet_size;
425  int nr_packets;
426  int i;
427 
428  M0_ENTRY("N: %d ITEMS_PER_PACKET: %d", N, ITEMS_PER_PACKET);
429 
430  for (i = 0; i < N; ++i)
431  items[i] = new_item(WAITING, NORMAL);
432 
433  saved_max_nr_bytes_acc = frm->f_constraints.fc_max_nr_bytes_accumulated;
435 
436  flags_reset();
437  for (i = 0; i < N; ++i)
438  m0_rpc_frm_enq_item(frm, items[i]);
439 
441  check_frm(FRM_BUSY, N, 0);
442 
443  saved_max_packet_size = frm->f_constraints.fc_max_packet_size;
444  /* Each packet should carry ITEMS_PER_PACKET items */
446  ITEMS_PER_PACKET * m0_rpc_item_size(items[0]) +
449  /* trigger formation so that all items are formed */
452  nr_packets = N / ITEMS_PER_PACKET + (N % ITEMS_PER_PACKET != 0 ? 1 : 0);
453  M0_UT_ASSERT(packet_ready_called && top == nr_packets);
454  check_frm(FRM_BUSY, 0, nr_packets);
455 
456  for (i = 0; i < nr_packets; ++i) {
457  p = packet_stack[i];
458  if (N % ITEMS_PER_PACKET == 0 ||
459  i != nr_packets - 1)
460  M0_UT_ASSERT(p->rp_ow.poh_nr_items == ITEMS_PER_PACKET);
461  else
462  M0_UT_ASSERT(p->rp_ow.poh_nr_items == N %
463  ITEMS_PER_PACKET);
464  (void)packet_stack_pop();
467  }
468  check_frm(FRM_IDLE, 0, 0);
469  for (i = 0; i < N; ++i) {
470  m0_rpc_item_fini(items[i]);
471  m0_free(items[i]);
472  }
473  frm->f_constraints.fc_max_packet_size = saved_max_packet_size;
474  frm->f_constraints.fc_max_nr_bytes_accumulated = saved_max_nr_bytes_acc;
476 
477  M0_LEAVE();
478 }
479 
480 static void frm_test5(void)
481 {
482  M0_ENTRY();
483  frm_do_test5(7, 3);
484  frm_do_test5(7, 6);
485  frm_do_test5(8, 8);
486  frm_do_test5(8, 2);
487  frm_do_test5(4, 1);
488  M0_LEAVE();
489 }
490 
491 static void frm_test6(void)
492 {
493  /* If packet allocation fails then packet is not formed and items
494  remain in frm */
495  struct m0_rpc_item *item;
496 
497  M0_ENTRY();
498 
499  flags_reset();
501 
502  m0_fi_enable_once("m0_alloc", "fail_allocation");
503 
506  check_frm(FRM_BUSY, 1, 0);
507 
508  /* this time allocation succeds */
511  check_frm(FRM_BUSY, 0, 1);
512 
515  m0_free(item);
516 
517  M0_LEAVE();
518 }
519 
520 static void frm_test7(void)
521 {
522  /* higher priority item is added to packet before lower priority */
523  struct m0_rpc_item *item1;
524  struct m0_rpc_item *item2;
525  m0_bcount_t saved_max_packet_size;
526  int saved_max_nr_packets_enqed;
527 
528  M0_ENTRY();
529 
530  item1 = new_item(TIMEDOUT, NORMAL);
531  item2 = new_item(TIMEDOUT, NORMAL);
532  item1->ri_prio = M0_RPC_ITEM_PRIO_MIN;
533  item2->ri_prio = M0_RPC_ITEM_PRIO_MAX;
534 
535  /* Only 1 item should be included per packet */
536  saved_max_packet_size = frm->f_constraints.fc_max_packet_size;
540  m0_rpc_item_size(item1) / 2;
541 
542  saved_max_nr_packets_enqed = frm->f_constraints.fc_max_nr_packets_enqed;
543  frm->f_constraints.fc_max_nr_packets_enqed = 0; /* disable formation */
544 
545  flags_reset();
546 
547  m0_rpc_frm_enq_item(frm, item1);
549  check_frm(FRM_BUSY, 1, 0);
550 
551  m0_rpc_frm_enq_item(frm, item2);
553  check_frm(FRM_BUSY, 2, 0);
554 
555  /* Enable formation */
556  frm->f_constraints.fc_max_nr_packets_enqed = saved_max_nr_packets_enqed;
558 
559  /* Two packets should be generated */
561  check_frm(FRM_BUSY, 0, 2);
562 
563  /* First packet should be carrying item2 because it has higher
564  priority
565  */
568 
571 
575 
576  m0_rpc_item_fini(item1);
577  m0_rpc_item_fini(item2);
578  m0_free(item1);
579  m0_free(item2);
580 
581  frm->f_constraints.fc_max_packet_size = saved_max_packet_size;
582 
583  M0_LEAVE();
584 }
585 
586 static void frm_test8(void)
587 {
588  /* Add items with random priority and random deadline */
589  enum { N = 100 };
590  enum m0_rpc_item_priority prio;
591  struct m0_rpc_packet *p;
592  struct m0_rpc_item *items[N];
593  m0_bcount_t saved_max_nr_bytes_acc;
594  uint64_t seed_deadline;
595  uint64_t seed_prio;
596  uint64_t seed_kind;
597  m0_time_t seed_timeout;
598  m0_time_t _timeout;
599  int saved_max_nr_packets_enqed;
600  int i;
601  int deadline;
602  int kind;
603 
604  saved_max_nr_packets_enqed = frm->f_constraints.fc_max_nr_packets_enqed;
605  frm->f_constraints.fc_max_nr_packets_enqed = 0; /* disable formation */
606 
607  /* start with some random seed */
608  seed_deadline = 13;
609  seed_kind = 17;
610  seed_prio = 57;
611 
612  flags_reset();
613  for (i = 0; i < N; ++i) {
614  deadline = m0_rnd(3, &seed_deadline) + 1;
615  kind = m0_rnd(2, &seed_kind) + 1;
616  prio = m0_rnd(M0_RPC_ITEM_PRIO_NR, &seed_prio);
617  _timeout = m0_rnd(1000, &seed_timeout);
618 
619  set_timeout(_timeout);
620 
621  items[i] = new_item(deadline, kind);
622  items[i]->ri_prio = prio;
623 
624  m0_rpc_frm_enq_item(frm, items[i]);
626  check_frm(FRM_BUSY, i + 1, 0);
627  }
628  frm->f_constraints.fc_max_nr_packets_enqed = ~0; /* enable formation */
629  saved_max_nr_bytes_acc = frm->f_constraints.fc_max_nr_bytes_accumulated;
630  /* Make frm to form all items */
634  check_frm(FRM_BUSY, 0, top);
635 
636  while (!packet_stack_is_empty()) {
637  p = packet_stack_pop();
640  }
641  check_frm(FRM_IDLE, 0, 0);
642  for (i = 0; i < N; i++) {
643  m0_rpc_item_fini(items[i]);
644  m0_free(items[i]);
645  }
646 
647  frm->f_constraints.fc_max_nr_packets_enqed = saved_max_nr_packets_enqed;
648  frm->f_constraints.fc_max_nr_bytes_accumulated = saved_max_nr_bytes_acc;
649 
650  M0_LEAVE();
651 }
652 
653 static void frm_fini_test(void)
654 {
657 }
658 
659 struct m0_ut_suite frm_ut = {
660  .ts_name = "rpc-formation-ut",
661  .ts_init = frm_ut_init,
662  .ts_fini = frm_ut_fini,
663  .ts_tests = {
664  { "frm-init", frm_init_test},
665  { "frm-test1", frm_test1 },
666  { "frm-test2", frm_test2 },
667  { "frm-test3", frm_test3 },
668  { "frm-test5", frm_test5 },
669  { "frm-test6", frm_test6 },
670  { "frm-test7", frm_test7 },
671  { "frm-test8", frm_test8 },
672  { "frm-fini", frm_fini_test},
673  { NULL, NULL }
674  }
675 };
676 M0_EXPORTED(frm_ut);
static void check_ready_packet_has_item(struct m0_rpc_item *item)
Definition: formation2.c:248
M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
Definition: formation2.c:222
static struct m0_rpc_stats saved
Definition: item.c:54
static void frm_test8(void)
Definition: formation2.c:586
static struct m0_addb2_philter p
Definition: consumer.c:40
static void frm_init_test(void)
Definition: formation2.c:130
static void frm_test1(void)
Definition: formation2.c:298
const struct m0_sm_conf incoming_item_sm_conf
Definition: item.c:60
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
struct m0_rpc_frm_constraints f_constraints
#define NULL
Definition: misc.h:38
static struct m0_rpc_item_type oneway_item_type
Definition: formation2.c:40
static struct m0_rpc_chan rchan
Definition: formation2.c:37
static void frm_test6(void)
Definition: formation2.c:491
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
M0_INTERNAL void m0_rpc_frm_packet_done(struct m0_rpc_packet *p)
Definition: formation2.c:698
static bool packet_stack_is_empty(void)
Definition: formation2.c:103
Definition: sm.h:350
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
uint64_t m0_time_t
Definition: time.h:37
M0_LEAVE()
struct m0_container container
struct m0_tl rm_outgoing_conns
Definition: rpc_machine.h:95
static struct m0_rpc_item_type_ops oneway_item_type_ops
Definition: formation2.c:176
struct m0_clink s_clink
Definition: sm.h:516
struct m0_sm ri_sm
Definition: item.h:181
static struct m0_rpc_frm_constraints constraints
Definition: formation2.c:35
struct m0_sm_conf rit_incoming_conf
Definition: item.h:483
#define M0_BITS(...)
Definition: misc.h:236
Definition: xcode.c:59
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
m0_rpc_item_priority
Definition: item.h:51
Definition: ut.h:77
#define N(i)
void m0_rpc_item_init(struct m0_rpc_item *item, const struct m0_rpc_item_type *itype)
Definition: item.c:364
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL struct m0_rpc_chan * frm_rchan(const struct m0_rpc_frm *frm)
Definition: formation2.c:156
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
static void frm_test7(void)
Definition: formation2.c:520
static void packet_stack_push(struct m0_rpc_packet *p)
Definition: formation2.c:88
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
static int top
Definition: formation2.c:86
#define M0_ENTRY(...)
Definition: trace.h:170
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
Definition: item.c:470
void m0_rpc_item_fini(struct m0_rpc_item *item)
Definition: item.c:394
int i
Definition: dir.c:1033
M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm, struct m0_rpc_frm_constraints *constraints, const struct m0_rpc_frm_ops *ops)
Definition: formation2.c:197
static int packet_ready(struct m0_rpc_packet *p)
Definition: formation2.c:117
static void flags_reset(void)
Definition: formation2.c:111
static m0_bcount_t twoway_item_size(const struct m0_rpc_item *item)
Definition: formation2.c:137
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
m0_bcount_t(* rito_payload_size)(const struct m0_rpc_item *item)
Definition: item.h:385
M0_INTERNAL uint64_t m0_rnd(uint64_t max, uint64_t *seed)
Definition: misc.c:115
M0_INTERNAL struct m0_rpc_machine * frm_rmachine(const struct m0_rpc_frm *frm)
Definition: formation2.c:161
static void frm_test2(void)
Definition: formation2.c:376
static void perform_test2(int kind)
Definition: formation2.c:331
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
Definition: item.c:704
static int frm_ut_init(void)
Definition: formation2.c:45
#define M0_ASSERT(cond)
struct m0_sm_conf rit_outgoing_conf
Definition: item.h:484
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
static struct m0_rpc_machine rmachine
Definition: formation2.c:36
static void frm_test5(void)
Definition: formation2.c:480
static m0_time_t timeout
Definition: formation2.c:196
static struct m0_rpc_item * new_item(int deadline, int kind)
Definition: formation2.c:202
struct m0_rpc_machine * rc_rpc_machine
static void item_get_noop(struct m0_rpc_item *item)
Definition: formation2.c:149
static void frm_test3(void)
Definition: formation2.c:390
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
static bool twoway_item_try_merge(struct m0_rpc_item *container, struct m0_rpc_item *component, m0_bcount_t limit)
Definition: formation2.c:142
static void perform_test(int deadline, int kind)
Definition: formation2.c:261
struct m0_rpc_frm rc_frm
static m0_bcount_t oneway_item_size(const struct m0_rpc_item *item)
Definition: formation2.c:171
int(* fo_packet_ready)(struct m0_rpc_packet *p)
M0_INTERNAL void m0_rpc_frm_run_formation(struct m0_rpc_frm *frm)
Definition: formation2.c:684
static struct m0_rpc_packet * packet_stack_pop(void)
Definition: formation2.c:96
enum frm_state f_state
M0_INTERNAL void m0_rpc_frm_constraints_get_defaults(struct m0_rpc_frm_constraints *c)
Definition: formation2.c:171
static int item_bind_count
Definition: formation2.c:109
uint64_t f_nr_packets_enqed
struct m0_thread rm_worker
Definition: rpc_machine.h:129
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:260
static void frm_fini_test(void)
Definition: formation2.c:653
uint64_t rit_flags
Definition: item.h:478
static void set_timeout(uint64_t milli)
Definition: formation2.c:197
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL bool m0_rpc_packet_is_carrying_item(const struct m0_rpc_packet *p, const struct m0_rpc_item *item)
Definition: packet.c:208
static struct m0_rpc_packet * packet_stack[STACK_SIZE]
Definition: formation2.c:85
const char * ts_name
Definition: ut.h:99
static struct m0_rpc_frm * frm
Definition: formation2.c:34
struct m0_sm_state_descr * scf_state
Definition: sm.h:356
m0_bcount_t fc_max_nr_bytes_accumulated
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
const struct m0_sm_conf outgoing_item_sm_conf
Definition: item.c:59
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_INTERNAL m0_bcount_t m0_rpc_packet_onwire_footer_size(void)
Definition: packet.c:68
M0_INTERNAL void m0_rpc_packet_discard(struct m0_rpc_packet *packet)
Definition: packet.c:134
static void m0_fi_enable_once(const char *func, const char *tag)
Definition: finject.h:301
static void item_put_noop(struct m0_rpc_item *item)
Definition: formation2.c:154
M0_INTERNAL void rpc_worker_thread_fn(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:379
struct m0_ut_suite frm_ut
Definition: formation2.c:659
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
Definition: item.c:813
void m0_free(void *data)
Definition: memory.c:146
static struct m0_rpc_item_type_ops twoway_item_type_ops
Definition: formation2.c:159
M0_INTERNAL m0_bcount_t m0_rpc_packet_onwire_header_size(void)
Definition: packet.c:54
static void check_frm(enum frm_state state, uint64_t nr_items, uint64_t nr_packets)
Definition: formation2.c:241
static struct m0_rpc_item_type twoway_item_type
Definition: formation2.c:39
int32_t rc
Definition: trigger_fop.h:47
const struct m0_sm_conf * sm_conf
Definition: sm.h:320
#define M0_UT_ASSERT(a)
Definition: ut.h:46
static struct m0_rpc_frm_ops frm_ops
Definition: formation2.c:126
m0_time_t ri_deadline
Definition: item.h:141
static bool packet_ready_called
Definition: formation2.c:108
static void frm_do_test5(const int N, const int ITEMS_PER_PACKET)
Definition: formation2.c:418
static int frm_ut_fini(void)
Definition: formation2.c:72