Motr  M0
item.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
24 #include "lib/trace.h"
25 
26 #define M0_UT_TRACE 0
27 
28 #include "ut/ut.h"
29 #include "lib/finject.h"
30 #include "lib/fs.h" /* m0_file_read */
31 #include "lib/misc.h" /* M0_BITS */
32 #include "lib/semaphore.h"
33 #include "lib/memory.h"
34 #include "rpc/rpclib.h"
35 #include "rpc/ut/clnt_srv_ctx.c" /* sctx, cctx. NOTE: This is .c file */
36 #include "rpc/ut/fops.h"
37 #include "rpc/rpc_internal.h"
38 
39 enum {
40  TIMEOUT = 4
41 };
42 
43 static int _test(void);
44 static void _test_timeout(m0_time_t deadline, m0_time_t timeout, bool reset);
45 static void _test_resend(struct m0_fop *fop, bool post_sync);
46 static void _test_timer_start_failure(void);
47 static void _ha_notify(struct m0_rpc_conn *conn, uint8_t state);
48 static void _ha_do_not_notify(struct m0_rpc_conn *conn, uint8_t state);
49 
51 static struct m0_fid expected_fid;
52 static struct m0_rpc_machine *machine;
53 static struct m0_rpc_session *session;
54 static struct m0_rpc_stats saved;
55 static struct m0_rpc_stats stats;
56 static struct m0_rpc_item *item;
57 static struct m0_fop *fop;
58 static int item_rc;
60 extern const struct m0_sm_conf outgoing_item_sm_conf;
61 extern const struct m0_sm_conf incoming_item_sm_conf;
62 
63 #define IS_INCR_BY_N(p, n) _0C(saved.rs_ ## p + (n) == stats.rs_ ## p)
64 #define IS_INCR_BY_1(p) IS_INCR_BY_N(p, 1)
65 
66 static int ts_item_init(void) /* ts_ for "test suite" */
67 {
74 
75  return 0;
76 }
77 
78 static int ts_item_fini(void)
79 {
82  return 0;
83 }
84 
85 static bool chk_state(const struct m0_rpc_item *item,
86  enum m0_rpc_item_state state)
87 {
88  return item->ri_sm.sm_state == state;
89 }
90 
91 static void test_simple_transitions(void)
92 {
93  int rc;
94 
95  /* TEST1: Simple request and reply sequence */
96  M0_LOG(M0_DEBUG, "TEST:1:START");
97  m0_rpc_machine_get_stats(machine, &saved, false /* clear stats? */);
99  item = &fop->f_item;
101  0 /* deadline */);
102  M0_UT_ASSERT(rc == 0);
103  M0_UT_ASSERT(item->ri_error == 0);
108  M0_UT_ASSERT(IS_INCR_BY_1(nr_sent_items) &&
109  IS_INCR_BY_1(nr_rcvd_items));
112  M0_LOG(M0_DEBUG, "TEST:1:END");
113 }
114 
116 {
118  m0_fi_disable("packet_ready", "set_reply_error");
119 }
120 
121 static void test_reply_item_error(void)
122 {
123  int rc;
124  struct m0_thread thread = {0};
125 
126  M0_LOG(M0_DEBUG, "TEST:1:START");
127  m0_rpc_machine_get_stats(machine, &saved, false /* clear stats? */);
128  fop = fop_alloc(machine);
129  item = &fop->f_item;
130  m0_fi_enable("packet_ready", "set_reply_error");
131  rc = M0_THREAD_INIT(&thread, int, NULL,
133  0, "disable_fi");
134  M0_UT_ASSERT(rc == 0);
135 
136  rc = m0_rpc_post_sync(fop, session, NULL, 0 /* deadline */);
137 
138  /* Error happens on server side, and client will try to resend fop.
139  * (M0_RPC_ITEM_RESEND_INTERVAL * 2 + 1) seconds later, server sends
140  * back reply successfully. */
141  M0_UT_ASSERT(rc == 0);
142  M0_UT_ASSERT(item->ri_error == 0);
145  M0_LOG(M0_DEBUG, "TEST:1:END");
147 }
148 
149 extern void (*m0_rpc__item_dropped)(struct m0_rpc_item *item);
150 
151 static struct m0_semaphore wait;
152 static void test_dropped(struct m0_rpc_item *item)
153 {
155 }
156 
157 static void test_timeout(void)
158 {
159  const struct m0_rpc_conn_ha_cfg *rchc_orig = session->s_conn->c_ha_cfg;
160  struct m0_rpc_conn_ha_cfg rchc_ut = *rchc_orig;
161  int rc;
162 
163  /* Test2.1: Request item times out before reply reaches to sender.
164  Delayed reply is then dropped.
165  */
168  M0_LOG(M0_DEBUG, "TEST:2.1:START");
169  fop = fop_alloc(machine);
170  item = &fop->f_item;
171  item->ri_nr_sent_max = 1;
173  m0_fi_enable_once("cs_req_fop_fom_tick", "inject_delay");
174  m0_fi_enable_once("item_received", "drop_signal");
175  m0_semaphore_init(&wait, 0);
178  0 /* deadline */);
179  M0_UT_ASSERT(rc == -ETIMEDOUT);
180  M0_UT_ASSERT(item->ri_error == -ETIMEDOUT);
186  M0_UT_ASSERT(IS_INCR_BY_1(nr_dropped_items) &&
187  IS_INCR_BY_1(nr_timedout_items) &&
188  IS_INCR_BY_1(nr_failed_items));
192  M0_LOG(M0_DEBUG, "TEST:2.1:END");
193 
194  /* Test [ENQUEUED] ---timeout----> [FAILED] */
195  M0_LOG(M0_DEBUG, "TEST:2.2:START");
197  m0_time(0, 100 * M0_TIME_ONE_MSEC), true);
198  M0_LOG(M0_DEBUG, "TEST:2.2:END");
199 
200  /* Test [URGENT] ---timeout----> [FAILED] */
201  m0_fi_enable("frm_balance", "do_nothing");
202  M0_LOG(M0_DEBUG, "TEST:2.3:START");
204  m0_time(0, 100 * M0_TIME_ONE_MSEC), true);
205  m0_fi_disable("frm_balance", "do_nothing");
206  M0_LOG(M0_DEBUG, "TEST:2.3:END");
207 
208  /* Test: [SENDING] ---timeout----> [FAILED] */
209 
210  M0_LOG(M0_DEBUG, "TEST:2.4:START");
211  /* Delay "sent" callback for 300 msec. */
212  m0_fi_enable("buf_send_cb", "delay_callback");
213  /* ASSUMPTION: Sender will not get "new item received" event until
214  the thread that has called buf_send_cb()
215  comes out of sleep and returns to net layer.
216  */
218  m0_time(0, 100 * M0_TIME_ONE_MSEC), true);
219  /* wait until reply is processed */
221  M0_LOG(M0_DEBUG, "TEST:2.4:END");
222  m0_fi_disable("buf_send_cb", "delay_callback");
223  /* restore HA ops */
226 }
227 
228 static void _test_timeout(m0_time_t deadline,
230  bool reset)
231 {
232  fop = fop_alloc(machine);
233  item = &fop->f_item;
235  item->ri_nr_sent_max = 1;
237  m0_rpc_post_sync(fop, session, NULL, deadline);
238  M0_UT_ASSERT(item->ri_error == -ETIMEDOUT);
242  M0_UT_ASSERT(IS_INCR_BY_1(nr_timedout_items) &&
243  IS_INCR_BY_1(nr_failed_items));
246 }
247 
248 static bool only_second_time(void *data)
249 {
250  int *ip = data;
251 
252  ++*ip;
253  return *ip == 2;
254 }
255 
256 static bool drop_twice(void *data)
257 {
258  int *ip = data;
259 
260  ++*ip;
261  return *ip <= 2;
262 }
263 
264 static void _ha_do_not_notify(struct m0_rpc_conn *conn, uint8_t state)
265 {
266 }
267 
268 static void _ha_notify(struct m0_rpc_conn *conn, uint8_t state)
269 {
270  struct m0_conf_obj *svc_obj;
271 
272  M0_UT_ENTER();
273  /* make sure HA is to be called with expected parameters */
274  M0_UT_ASSERT(expected_state == state);
276  /*
277  * imitate reqh_service_ha_state_set() behavior while sending nothing to
278  * HA because there is no HA environment up and running to accept a note
279  */
281  svc_obj = m0_rpc_conn2svc(conn);
282  M0_UT_ASSERT(svc_obj != NULL);
283  svc_obj->co_ha_state = state;
284  /* toggle expected state */
287  M0_UT_RETURN();
288 }
289 
290 static void test_resend(void)
291 {
292  const struct m0_rpc_conn_ha_cfg *rchc_orig = session->s_conn->c_ha_cfg;
293  struct m0_rpc_conn_ha_cfg rchc_ut = *rchc_orig;
294  struct m0_rpc_item *item;
295  int rc;
296  int cnt = 0;
297  struct m0_fid sfid = M0_FID_TINIT('s', 1, 25);
299  struct m0_rconfc *cl_rconfc = &cctx.rcx_reqh.rh_rconfc;
300 
301  rc = m0_rconfc_init(cl_rconfc, m0_reqh2profile(reqh),
302  m0_locality0_get()->lo_grp, machine,
303  NULL, NULL);
304  M0_UT_ASSERT(rc == 0);
305  rc = m0_file_read(M0_UT_PATH("conf.xc"), &cl_rconfc->rc_local_conf);
306  M0_UT_ASSERT(rc == 0);
307  m0_rconfc_start(cl_rconfc);
309  M0_UT_ASSERT(rc == 0);
310 
312 
316  expected_fid = sfid;
317 
318  /* Test: Request is dropped. */
319  M0_LOG(M0_DEBUG, "TEST:3.1:START");
320  m0_fi_enable_once("item_received_fi", "drop_item");
321  _test_resend(NULL, true);
322  M0_LOG(M0_DEBUG, "TEST:3.1:END");
323 
324  /* Test: Reply is dropped. */
325  M0_LOG(M0_DEBUG, "TEST:3.2:START");
326  m0_fi_enable_func("item_received_fi", "drop_item",
328  _test_resend(NULL, true);
329  m0_fi_disable("item_received_fi", "drop_item");
330  M0_LOG(M0_DEBUG, "TEST:3.2:END");
331 
332  /* Test: ENQUEUED -> REPLIED transition.
333 
334  Reply is delayed. On sender, request item is enqueued for
335  resending. But before formation could send the item,
336  reply is received.
337 
338  nanosleep()s are inserted at specific points to create
339  this scenario:
340  - request is sent;
341  - the request is moved to WAITING_FOR_REPLY state;
342  - the item's timer is set to trigger after 1 sec;
343  - fault_point<"m0_rpc_reply_post", "delay_reply"> delays
344  sending reply by 1.2 sec;
345  - resend timer of request item triggers and calls
346  m0_rpc_item_send();
347  - fault_point<"m0_rpc_item_send", "advance_delay"> moves
348  deadline of request item 500ms in future, ergo the item
349  moves to ENQUEUED state when handed over to formation;
350  - receiver comes out of 1.2 sec sleep and sends reply.
351  */
352  M0_LOG(M0_DEBUG, "TEST:3.3:START");
353  cnt = 0;
354  m0_fi_enable_func("m0_rpc_item_send", "advance_deadline",
356  m0_fi_enable_once("m0_rpc_reply_post", "delay_reply");
357  fop = fop_alloc(machine);
358  item = &fop->f_item;
359  _test_resend(fop, true);
360  m0_fi_disable("m0_rpc_item_send", "advance_deadline");
362  M0_LOG(M0_DEBUG, "TEST:3.3:END");
363 
364  M0_LOG(M0_DEBUG, "TEST:3.4:START");
365  /* CONTINUES TO USE fop/item FROM PREVIOUS TEST-CASE. */
366  /* RPC call is complete i.e. item is in REPLIED state.
367  Explicitly resend the completed request; the way the item
368  will be resent during recovery.
369  */
377  M0_UT_ASSERT(rc == 0);
378  M0_UT_ASSERT(item->ri_error == 0);
384  M0_LOG(M0_DEBUG, "TEST:3.4:END");
385 
386  /* Test: INITIALISED -> FAILED transition when m0_rpc_post()
387  fails to start item timer.
388  */
389  M0_LOG(M0_DEBUG, "TEST:3.5.1:START");
390  m0_fi_enable_once("m0_rpc_item_timer_start", "failed");
392  M0_LOG(M0_DEBUG, "TEST:3.5.1:END");
393 
394  /* Test: Move item from WAITING_FOR_REPLY to FAILED state if
395  item_sent() fails to start resend timer.
396  */
397  M0_LOG(M0_DEBUG, "TEST:3.5.2:START");
398  cnt = 0;
399  m0_fi_enable_func("m0_rpc_item_timer_start", "failed",
401  m0_fi_enable("item_received_fi", "drop_item");
403  m0_fi_disable("item_received_fi", "drop_item");
404  m0_fi_disable("m0_rpc_item_timer_start", "failed");
406  /*
407  * Check that HA was notified about the delay for a response from the
408  * service on the rpc item.
409  */
411  stats.rs_nr_ha_timedout_items);
413  stats.rs_nr_ha_noted_conns);
414  M0_LOG(M0_DEBUG, "TEST:3.5.2:END");
415  /* restore HA configuration */
417  /* clean up */
420  m0_rconfc_stop_sync(cl_rconfc);
421  m0_rconfc_fini(cl_rconfc);
423 }
424 
425 static void _test_resend(struct m0_fop *fop, bool post_sync)
426 {
427  bool fop_put_flag = false;
428  int rc;
429 
430  if (fop == NULL) {
431  fop = fop_alloc(machine);
432  fop_put_flag = true;
433  }
434  item = &fop->f_item;
435  if (post_sync) {
436  rc = m0_rpc_post_sync(fop, session, NULL, 0 /* urgent */);
437  }
438  else {
440  item->ri_deadline = 0;
441  rc = m0_rpc_post(item);
442  }
443  M0_UT_ASSERT(rc == 0);
444  M0_UT_ASSERT(item->ri_error == 0);
446  if (post_sync) {
449  }
450  if (fop_put_flag && post_sync) {
453  }
454 }
455 
456 static void _test_timer_start_failure(void)
457 {
458  int rc;
459 
460  fop = fop_alloc(machine);
461  item = &fop->f_item;
462  rc = m0_rpc_post_sync(fop, session, NULL, 0 /* urgent */);
463  M0_UT_ASSERT(rc == -EINVAL);
464  M0_UT_ASSERT(item->ri_error == -EINVAL);
467  /* sleep until request reaches at server and is dropped */
468  m0_nanosleep(m0_time(0, 5 * 1000 * 1000), NULL);
471 }
472 
474 {
475  int rc;
476  int i;
477  struct /* anonymous */ {
478  const char *func;
479  const char *tag;
480  int rc;
481  } fp[] = {
482  {"m0_bufvec_alloc_aligned", "oom", -ENOMEM},
483  {"m0_net_buffer_register", "fake_error", -EINVAL},
484  {"m0_rpc_packet_encode", "fake_error", -EFAULT},
485  {"m0_net_buffer_add", "fake_error", -EMSGSIZE},
486  };
487 
488  /* TEST4: packet_ready() routine failed.
489  The item should move to FAILED state.
490  */
491  for (i = 0; i < ARRAY_SIZE(fp); ++i) {
492  M0_LOG(M0_DEBUG, "TEST:4.%d:START", i + 1);
493  m0_fi_enable_once(fp[i].func, fp[i].tag);
494  rc = _test();
495  M0_UT_ASSERT(rc == fp[i].rc);
496  M0_UT_ASSERT(item_rc == fp[i].rc);
497  M0_LOG(M0_DEBUG, "TEST:4.%d:END", i + 1);
499  }
500  /* TEST5: Network layer reported buffer send failure.
501  The item should move to FAILED state.
502  NOTE: Buffer sending is successful, hence we need
503  to explicitly drop the item on receiver using
504  fault_point<"item_received_fi", "drop_item">.
505  */
506  M0_LOG(M0_DEBUG, "TEST:5:START");
507  m0_fi_enable("buf_send_cb", "fake_err");
508  m0_fi_enable("item_received_fi", "drop_item");
509  rc = _test();
510  M0_UT_ASSERT(rc == -EINVAL);
511  M0_UT_ASSERT(item_rc == -EINVAL);
513  m0_fi_disable("buf_send_cb", "fake_err");
514  m0_fi_disable("item_received_fi", "drop_item");
515  M0_LOG(M0_DEBUG, "TEST:5:END");
517 }
518 
519 static int _test(void)
520 {
521  int rc;
522 
523  /* Check SENDING -> FAILED transition */
525  fop = fop_alloc(machine);
526  item = &fop->f_item;
528  0 /* deadline */);
530  item_rc = item->ri_error;
533  M0_UT_ASSERT(IS_INCR_BY_1(nr_failed_items));
535  return rc;
536 }
537 
538 static bool arrow_sent_cb_called = false;
539 static void arrow_sent_cb(struct m0_rpc_item *item)
540 {
541  arrow_sent_cb_called = true;
542 }
543 static const struct m0_rpc_item_ops arrow_item_ops = {
545 };
546 
547 static bool fop_release_called;
548 static void fop_release(struct m0_ref *ref)
549 {
550  fop_release_called = true;
551  m0_fop_release(ref);
552 }
553 
554 static void test_oneway_item(void)
555 {
556  struct m0_rpc_item *item;
557  struct m0_fop *fop;
558  bool ok;
559  int rc;
560 
561  arrow_sent_cb_called = false;
562  /* Test 1: Confirm one-way items reach receiver */
563  M0_LOG(M0_DEBUG, "TEST:6.1:START");
565  M0_UT_ASSERT(fop != NULL);
566 
567  item = &fop->f_item;
569  item->ri_deadline = 0;
576  M0_TIME_NEVER);
577  M0_UT_ASSERT(rc == 0);
580 
582  M0_UT_ASSERT(ok);
583 
585  M0_UT_ASSERT(ok);
586 
589  M0_LOG(M0_DEBUG, "TEST:6.1:END");
590 
591  /* Test 2: Remaining queued oneway items are dropped during
592  m0_rpc_frm_fini()
593  */
594  M0_LOG(M0_DEBUG, "TEST:6.2:START");
595  M0_ALLOC_PTR(fop);
596  M0_UT_ASSERT(fop != NULL);
599  M0_UT_ASSERT(rc == 0);
600  item = &fop->f_item;
609  m0_fi_enable("frm_fill_packet", "skip_oneway_items");
610  /* stop client server to trigger m0_rpc_frm_fini() */
612  M0_UT_ASSERT(arrow_sent_cb_called); /* callback with FAILED items */
615  m0_fi_disable("frm_fill_packet", "skip_oneway_items");
616  M0_LOG(M0_DEBUG, "TEST:6.2:END");
617 }
618 
619 /*
620 static void rply_before_sentcb(void)
621 {
622  @todo Simulate a case where:
623  - Request item A is serialised in network buffer NB_A;
624  - NB_A is submitted to net layer;
625  - A is in SENDING state;
626  - NB_A.sent() callback is not yet received;
627  - And reply to A is received.
628  In this case reply processing of A should be postponed until
629  NB_A.sent() callback is invoked.
630 
631  Tried to simulate this case, by introducing artificial delay in
632  buf_send_cb(). But because there is only one thread from lnet
633  transport that delivers buffer events, it also blocks delivery of
634  net_buf_receieved(A.reply) event.
635 }
636 */
637 
639 bool ALREADY_REPLIED = true;
640 
641 void fop_test(int expected_rc)
642 {
643  int rc;
644 
645  fop = fop_alloc(machine);
646  item = &fop->f_item;
650  rc = m0_rpc_post_sync(fop, session, NULL, 0 /* deadline */);
651  if (expected_rc == 0) {
652  M0_UT_ASSERT(rc == 0);
653  M0_UT_ASSERT(item->ri_error == 0);
656  } else {
657  M0_UT_ASSERT(rc == -ECANCELED);
658  M0_UT_ASSERT(rc == expected_rc);
659  M0_UT_ASSERT(item->ri_error == expected_rc);
662  }
665 }
666 
667 static void check_cancel(bool already_replied, bool reinitialise)
668 {
669  uint64_t xid = item->ri_header.osr_xid;
670  int rc;
671 
673 
674  if (reinitialise)
676  else {
678  if (already_replied) {
679  /* Item already replied. Hence, not cancelled. */
680  M0_UT_ASSERT(item->ri_error == 0);
682  return;
683  }
684  }
685 
686  /* Verify that the item was indeed cancelled. */
688  if (reinitialise) {
689  M0_UT_ASSERT(item->ri_error == 0);
691 
692  /* Re-post the item that was re-initialised. */
694  0 /* deadline */);
695  M0_UT_ASSERT(rc == 0);
696  M0_UT_ASSERT(item->ri_error == 0);
701  } else {
702  M0_UT_ASSERT(item->ri_error == -ECANCELED);
704  }
706 }
707 
708 static void cancel_item_with_various_states(bool reinitialise)
709 {
710  int rc;
711  int sub_tc = reinitialise ? 1 : 2;
712 
713  M0_LOG(M0_DEBUG, "TEST:7:%d:START", sub_tc);
714 
715  /*
716  * Cancel item that is already replied.
717  * In this case, m0_rpc_item_cancel() is a no-op.
718  */
719  M0_LOG(M0_DEBUG, "TEST:7:%d:1:START", sub_tc);
720  fop = fop_alloc(machine);
721  item = &fop->f_item;
722 
724  0 /* deadline */);
725  M0_UT_ASSERT(rc == 0);
726  M0_UT_ASSERT(item->ri_error == 0);
730  m0_fi_enable_once("item_cancel_fi", "cancel_replied_item");
731  check_cancel(ALREADY_REPLIED, reinitialise);
734  M0_LOG(M0_DEBUG, "TEST:7:%d:1:END", sub_tc);
735 
736  /* Cancel item while in formation. */
737  M0_LOG(M0_DEBUG, "TEST:7:%d:2:START", sub_tc);
738  fop = fop_alloc(machine);
739  item = &fop->f_item;
743  rc = m0_rpc_post(item);
744  M0_UT_ASSERT(rc == 0);
747  m0_fi_enable_once("item_cancel_fi", "cancel_enqueued_item");
748  check_cancel(!ALREADY_REPLIED, reinitialise);
751  M0_LOG(M0_DEBUG, "TEST:7:%d:2:END", sub_tc);
752 
753  /* Cancel while item is in SENDING state. */
754  M0_LOG(M0_DEBUG, "TEST:7:%d:3:START", sub_tc);
755  m0_fi_enable("buf_send_cb", "delay_callback");
756  fop = fop_alloc(machine);
757  item = &fop->f_item;
761  rc = m0_rpc_post(item);
762  M0_UT_ASSERT(rc == 0);
765  M0_TIME_NEVER);
766  M0_UT_ASSERT(rc == 0);
767  m0_fi_enable_once("item_cancel_fi", "cancel_sending_item");
768  check_cancel(!ALREADY_REPLIED, reinitialise);
769  m0_fi_disable("buf_send_cb", "delay_callback");
772  M0_LOG(M0_DEBUG, "TEST:7:%d:3:END", sub_tc);
773 
774  /*
775  * Cancel while waiting for reply.
776  * If reply is received for this request after cancelation, then it
777  * is dropped and is recorded using a record of the kind:
778  * item_received] 0x.. [REPLY/6] dropped
779  */
780  M0_LOG(M0_DEBUG, "TEST:7:%d:4:START", sub_tc);
781  m0_fi_enable_once("cs_req_fop_fom_tick", "inject_delay");
782  fop = fop_alloc(machine);
783  item = &fop->f_item;
787  rc = m0_rpc_post(item);
788  M0_UT_ASSERT(rc == 0);
790  M0_TIME_NEVER);
791  M0_UT_ASSERT(rc == 0);
793  m0_fi_enable_once("item_cancel_fi", "cancel_waiting_for_reply_item");
794  check_cancel(!ALREADY_REPLIED, reinitialise);
797  M0_LOG(M0_DEBUG, "TEST:7:%d:4:END", sub_tc);
798 
799  M0_LOG(M0_DEBUG, "TEST:7:%d:END", sub_tc);
800 }
801 
802 static void test_cancel_item(void)
803 {
804  /*
805  * Cancel item along with sending 'reinitialise = true' to
806  * m0_rpc_item_cancel().
807  */
809 
810  /*
811  * Cancel item along with sending 'reinitialise = false' to
812  * m0_rpc_item_cancel().
813  */
815 }
816 
817 uint32_t fop_dispatched_nr = 0;
818 /*
819  * This is to help keep track of how many fops issued around session
820  * cancelation have been taken to completion, may it be with success
821  * or failure.
822  */
823 static void session_ut_item_cb(struct m0_rpc_item *item)
824 {
826 }
827 
828 static const struct m0_rpc_item_ops session_ut_item_ops = {
830 };
831 
832 static void test_cancel_session(void)
833 {
834  uint32_t fop_nr = 10;
835  uint32_t fop_cancelled_nr = 0;
836  struct m0_fop *fop_arr[fop_nr];
837  uint32_t i;
838  int rc;
839 
840  /*
841  * Cancel rpc session while it may have items in various states like
842  * ENQUEUED, URGENT, SENDING, SENT or WAITING_FOR_REPLY. Replies are
843  * dropped for the items applicable.
844  */
845  M0_LOG(M0_DEBUG, "TEST:8:1:START");
846  m0_fi_enable("item_received_fi", "drop_item_reply");
847  for (i = 0; i < fop_nr; ++i) {
848  fop = fop_alloc(machine);
849  item = &fop->f_item;
854  rc = m0_rpc_post(item);
855  M0_UT_ASSERT(rc == 0);
857  fop_arr[i] = fop;
859  }
860 
861  M0_LOG(M0_DEBUG, "TEST:8:1:2: cancel session");
864 
865  for (i = 0; i < fop_nr; ++i) {
866  item = &fop_arr[i]->f_item;
867  M0_UT_ASSERT(m0_ref_read(&fop_arr[i]->f_ref) == 1);
871  M0_UT_ASSERT(item->ri_error == -ECANCELED);
872  ++fop_cancelled_nr;
873  }
874  m0_fop_put_lock(fop_arr[i]);
875  }
876  M0_UT_ASSERT(fop_cancelled_nr > 0);
877 
878  /*
879  * Post a fop to verify that it gets cancelled while in the INITIALISED
880  * state, the session being cancelled.
881  */
882  fop_test(-ECANCELED);
883  m0_fi_disable("item_received_fi", "drop_item_reply");
884 
885  M0_LOG(M0_DEBUG, "TEST:8:1:2: restore session");
886  /*
887  * In production scenario, session will be restored through service
888  * reconnect. It being UT, simply, destroying and recreating the
889  * session.
890  */
891  M0_UT_ASSERT(session->s_cancelled == true);
892  M0_UT_ASSERT(session->s_xid > 0);
894  M0_UT_ASSERT(rc == 0);
897  M0_UT_ASSERT(rc == 0);
898  M0_UT_ASSERT(session->s_cancelled == false);
899  M0_UT_ASSERT(session->s_xid == 0);
900 
901  /*
902  * Post a fop successfully to verify that the session has been restored.
903  * This also ensures that 'the items received on the receiver which have
904  * been cancelled on the sender side' have been taken to completion.
905  * Otherwise, stop_rpc_client_and_server() may hang in a rare case.
906  */
907  fop_test(0);
908 
909  M0_LOG(M0_DEBUG, "TEST:8:1:END");
910 }
911 
912 enum {
914 };
915 
918 
920 {
925 }
926 
928 {
933 }
934 
935 extern const struct m0_sm_conf outgoing_item_sm_conf;
936 extern const struct m0_sm_conf incoming_item_sm_conf;
937 
940  .rito_item_put = test_item_cache_item_put,
941 };
942 static struct m0_rpc_item_type test_item_cache_itype = {
944 };
945 
946 /*
947  * Add each nth item to the cache.
948  * Lookup each item in cache.
949  * Then remove each item from the cache.
950  */
952  struct m0_mutex *lock,
953  struct m0_rpc_item *items,
954  int items_nr,
955  int n)
956 {
957  struct m0_rpc_item *item;
958  int added_nr;
959  int test_nr;
960  int i;
961 
962  M0_SET0(ic);
964  added_nr = 0;
965  for (i = 0; i < items_nr; ++i) {
966  if ((i % n) == 0) {
968  m0_rpc_item_cache_add(ic, &items[i], M0_TIME_NEVER);
969  ++added_nr;
970  }
971  }
972  /* no-op */
974  for (i = 0; i < items_nr; ++i) {
975  /* do nothing */
976  if ((i % n) == 0)
977  m0_rpc_item_cache_add(ic, &items[i], M0_TIME_NEVER);
978  }
979  test_nr = 0;
980  for (i = 0; i < items_nr; ++i) {
982  /* m0_rpc_item_cache_lookup() returns either NULL or item */
983  M0_UT_ASSERT(item == NULL || item == &items[i]);
984  M0_UT_ASSERT(equi(item != NULL, (i % n) == 0));
985  test_nr += item != NULL;
986  }
987  M0_UT_ASSERT(test_nr == added_nr);
988  for (i = 0; i < items_nr; ++i) {
989  if ((i % n) == 0)
992  }
993  /* cache is empty now */
994  /* do nothing */
996  for (i = 0; i < items_nr; ++i) {
998  M0_UT_ASSERT(item == NULL);
999  }
1001 }
1002 
1003 static void test_item_cache(void)
1004 {
1005  struct m0_rpc_item_cache ic = {};
1006  struct m0_rpc_machine rmach = {};
1007  struct m0_rpc_item *items;
1008  struct m0_mutex lock = {};
1009  int items_nr;
1010  int n;
1011  int i;
1012 
1014  M0_UT_ASSERT(items != NULL);
1015  for (i = 0; i < M0_RPC_ITEM_CACHE_ITEMS_NR_MAX; ++i) {
1017  items[i].ri_header.osr_xid = i;
1018  items[i].ri_rmachine = &rmach;
1019  }
1020  m0_mutex_init(&lock);
1021  m0_mutex_lock(&lock);
1022  /*
1023  * This is needed because m0_rpc_item_put() checks rpc machine lock.
1024  */
1025  m0_mutex_init(&rmach.rm_sm_grp.s_lock);
1026  m0_mutex_lock(&rmach.rm_sm_grp.s_lock);
1027  for (items_nr = 1;
1028  items_nr < M0_RPC_ITEM_CACHE_ITEMS_NR_MAX; ++items_nr) {
1029  for (n = 1; n <= items_nr; ++n)
1030  test_item_cache_add_nth(&ic, &lock, items, items_nr, n);
1031  }
1033  m0_mutex_fini(&rmach.rm_sm_grp.s_lock);
1035  m0_mutex_fini(&lock);
1036  for (i = 0; i < M0_RPC_ITEM_CACHE_ITEMS_NR_MAX; ++i)
1037  m0_rpc_item_fini(&items[i]);
1038  m0_free(items);
1039 }
1040 
1041 static struct m0_thread ha_thread = {0};
1043 
1044 void __ha_accept_imitate(struct m0_fid *sfid)
1045 {
1047  struct m0_confc *confc = m0_reqh2confc(reqh);
1048  struct m0_rconfc *cl_rconfc = &cctx.rcx_reqh.rh_rconfc;
1049  struct m0_conf_obj *obj;
1050 
1051  M0_ENTRY("fid "FID_F, FID_P(sfid));
1054  NULL);
1055  /* Update HA state of the service in client cache and server cache */
1057  M0_UT_ASSERT(obj != NULL);
1058  obj->co_ha_state = M0_NC_FAILED;
1059  m0_chan_broadcast_lock(&obj->co_ha_chan);
1060  obj = m0_conf_cache_lookup(&cl_rconfc->rc_confc.cc_cache, sfid);
1061  M0_UT_ASSERT(obj != NULL);
1062  obj->co_ha_state = M0_NC_FAILED;
1063  m0_chan_broadcast_lock(&obj->co_ha_chan);
1064  M0_UT_RETURN("broadcast done");
1065 }
1066 
1067 static void __ha_timer__dummy(struct m0_sm_timer *timer)
1068 {
1069  struct m0_rpc_conn *conn;
1070  struct m0_conf_obj *obj;
1071 
1072  M0_UT_ENTER();
1073  conn = container_of(timer, struct m0_rpc_conn, c_ha_timer);
1076  M0_LOG(M0_DEBUG, "obj = %p, fid "FID_F, obj, FID_P(&obj->co_id));
1077  M0_UT_ASSERT(obj->co_ha_state == M0_NC_FAILED);
1079  M0_UT_RETURN();
1080 }
1081 
1082 static bool __ha_service_event(struct m0_clink *link)
1083 {
1084  bool rc;
1085 
1086  M0_UT_ENTER();
1087  rc = rpc_conn_original_ha_cb(link);
1088  M0_UT_LOG("rc = %d", rc);
1089  M0_UT_RETURN();
1090  return rc;
1091 }
1092 
1093 static void test_ha_cancel(void)
1094 {
1095  const struct m0_rpc_conn_ha_cfg *rchc_orig = session->s_conn->c_ha_cfg;
1096  struct m0_rpc_conn_ha_cfg rchc_ut = *rchc_orig;
1098  struct m0_confc *confc = &reqh->rh_rconfc.rc_confc;
1099  struct m0_fid sfid = M0_FID_TINIT('s', 1, 25);
1100  struct m0_rconfc *cl_rconfc = &cctx.rcx_reqh.rh_rconfc;
1101  struct m0_conf_obj *obj;
1102  int rc;
1103 
1104  M0_SET0(&ha_thread);
1105  rc = m0_rconfc_init(cl_rconfc, m0_reqh2profile(reqh),
1106  m0_locality0_get()->lo_grp, machine,
1107  NULL, NULL);
1108  M0_UT_ASSERT(rc == 0);
1109  rc = m0_file_read(M0_UT_PATH("conf.xc"), &cl_rconfc->rc_local_conf);
1110  M0_UT_ASSERT(rc == 0);
1111  m0_rconfc_start(cl_rconfc);
1112 
1113  /*
1114  * Re-initiate rpc conn subscription to HA notes. This will replace
1115  * original ha clink callback with the local one. We need to follow
1116  * work flow, but take detours for the sake of test passage.
1117  */
1121  /*
1122  * We are going to imitate service death notification before connection
1123  * getting timed out. Need to intercept standard item's ha timer
1124  * callback to prevent sending temporary failure status to HA as we have
1125  * no real HA environment running
1126  */
1130  M0_UT_ASSERT(rc == 0);
1131  /* imitate external HA note acceptance */
1132  rc = M0_THREAD_INIT(&ha_thread, struct m0_fid *, NULL,
1133  &__ha_accept_imitate, &sfid, "death_note");
1134  M0_UT_ASSERT(rc == 0);
1135 
1136  /* send fop with delay enabled */
1137  fop = fop_alloc(machine);
1138  item = &fop->f_item;
1139  item->ri_nr_sent_max = 2;
1141  m0_fi_enable_once("cs_req_fop_fom_tick", "inject_delay");
1142  m0_semaphore_init(&wait, 0);
1143  M0_UT_LOG("posting item = %p", item);
1145  0 /* deadline */);
1146  M0_UT_LOG("done with posting");
1147  M0_UT_ASSERT(rc == -ECANCELED);
1148  M0_UT_ASSERT(item->ri_error == -ECANCELED);
1154  /* restore HA ops */
1155  m0_rpc_conn_ha_cfg_set(session->s_conn, rchc_orig);
1156  /* recover connection */
1158  M0_UT_ASSERT(rc == -ECANCELED);
1159  m0_rconfc_stop_sync(cl_rconfc);
1160  m0_rconfc_fini(cl_rconfc);
1162  M0_UT_ASSERT(rc == 0);
1163  /* recover service object */
1164  obj = m0_conf_cache_lookup(&confc->cc_cache, &sfid);
1165  M0_UT_ASSERT(obj != NULL);
1166  obj->co_ha_state = M0_NC_ONLINE;
1167 }
1168 
1169 static void test_ha_notify()
1170 {
1171  const struct m0_rpc_conn_ha_cfg *rchc_orig = session->s_conn->c_ha_cfg;
1172  struct m0_rpc_conn_ha_cfg rchc_ut = *rchc_orig;
1173  struct m0_fid sfid = M0_FID_TINIT('s', 1, 25);
1175  struct m0_rconfc *cl_rconfc = &cctx.rcx_reqh.rh_rconfc;
1176  struct m0_fop *fop2;
1177  int cnt = 0;
1178  int rc;
1179 
1180  rc = m0_rconfc_init(cl_rconfc, m0_reqh2profile(reqh),
1181  m0_locality0_get()->lo_grp, machine,
1182  NULL, NULL);
1183  M0_UT_ASSERT(rc == 0);
1184  rc = m0_file_read(M0_UT_PATH("conf.xc"), &cl_rconfc->rc_local_conf);
1185  M0_UT_ASSERT(rc == 0);
1186  rc = m0_rconfc_start(cl_rconfc);
1187  M0_UT_ASSERT(rc == 0);
1189  M0_UT_ASSERT(rc == 0);
1190 
1191  rchc_ut.rchc_ops.cho_ha_notify = _ha_notify;
1194  expected_fid = sfid;
1195 
1197  /* Test: one item resend, HA must be notified */
1198  M0_LOG(M0_DEBUG, "TEST:3.1:START");
1199  m0_fi_enable_once("item_received_fi", "drop_item");
1200  _test_resend(NULL, true);
1202  /*
1203  * rs_nr_ha_noted_conns equals to 2 because the scenario is as follows:
1204  * 1. item_resend() is triggered.
1205  * 2. Notify HA about M0_NC_TRANSIENT state.
1206  * 3. The reply is received after one resend.
1207  * 4. Notify HA about M0_NC_ONLINE state.
1208  */
1209  M0_UT_ASSERT(IS_INCR_BY_N(nr_ha_noted_conns, 2) &&
1210  IS_INCR_BY_1(nr_resent_items));
1211  M0_LOG(M0_DEBUG, "TEST:3.1:END");
1212 
1214  /*
1215  * Test: an item is resent twice, but HA must be notified only once
1216  * about M0_NC_TRANSIENT state and M0_NC_ONLINE state when the reply is
1217  * received.
1218  */
1219  M0_LOG(M0_DEBUG, "TEST:3.2:START");
1220  m0_fi_enable_func("item_received_fi", "drop_item",
1221  drop_twice, &cnt);
1222  _test_resend(NULL, true);
1223  m0_fi_disable("item_received_fi", "drop_item");
1225  M0_UT_ASSERT(IS_INCR_BY_N(nr_ha_noted_conns, 2) &&
1226  IS_INCR_BY_N(nr_resend_attempts, 2));
1227  M0_LOG(M0_DEBUG, "TEST:3.2:END");
1228 
1230  /*
1231  * Test: two items are resent, but HA must be notified only once about
1232  * M0_NC_TRANSIENT state and M0_NC_ONLINE state when the reply is
1233  * received.
1234  */
1235  M0_LOG(M0_DEBUG, "TEST:3.3:START");
1236  cnt = 0;
1237  m0_fi_enable_func("item_received_fi", "drop_item",
1238  drop_twice, &cnt);
1239  fop = fop_alloc(machine);
1240  fop2 = fop_alloc(machine);
1241  _test_resend(fop, false);
1242  _test_resend(fop2, false);
1244  M0_UT_ASSERT(rc == 0);
1246  M0_UT_ASSERT(rc == 0);
1247  m0_fi_disable("item_received_fi", "drop_item");
1249  M0_UT_ASSERT(fop2->f_item.ri_nr_sent == 2);
1251  m0_fop_put_lock(fop2);
1253  M0_UT_ASSERT(IS_INCR_BY_N(nr_ha_noted_conns, 2) &&
1254  IS_INCR_BY_N(nr_resent_items, 2));
1255  M0_LOG(M0_DEBUG, "TEST:3.3:END");
1256  /*
1257  * Test: item_timeout() occurs, notify HA about M0_NC_TRANSIENT state.
1258  */
1261  m0_time(0, 100 * M0_TIME_ONE_MSEC), false);
1265  M0_UT_ASSERT(IS_INCR_BY_1(nr_ha_noted_conns));
1267  m0_time(0, 100 * M0_TIME_ONE_MSEC), false);
1271  M0_UT_ASSERT(saved.rs_nr_ha_noted_conns == stats.rs_nr_ha_noted_conns);
1272  /*
1273  * Report about M0_NC_ONLINE state if a reply was received for another
1274  * item after timeout happens.
1275  */
1277  fop = fop_alloc(machine);
1278  rc = m0_rpc_post_sync(fop, session, NULL, 0 /* urgent */);
1279  M0_UT_ASSERT(rc == 0);
1282  M0_UT_ASSERT(IS_INCR_BY_1(nr_ha_noted_conns));
1283  /* restore HA ops */
1284  m0_rpc_conn_ha_cfg_set(session->s_conn, rchc_orig);
1285  /* clean up */
1288  m0_rconfc_stop_sync(cl_rconfc);
1289  m0_rconfc_fini(cl_rconfc);
1290 }
1291 
1293  .ts_name = "rpc-item-ut",
1294  .ts_init = ts_item_init,
1295  .ts_fini = ts_item_fini,
1296  .ts_tests = {
1297  { "cache", test_item_cache },
1298  { "simple-transitions", test_simple_transitions },
1299  { "reply-item-error", test_reply_item_error },
1300  { "item-timeout", test_timeout },
1301  { "item-resend", test_resend },
1302  { "failure-before-sending", test_failure_before_sending },
1303  { "oneway-item", test_oneway_item },
1304  { "cancel", test_cancel_item },
1305  { "ha-cancel", test_ha_cancel },
1306  { "cancel-session", test_cancel_session },
1307  { "ha-notify", test_ha_notify },
1308  { NULL, NULL },
1309  }
1310 };
1311 
1312 #undef M0_TRACE_SUBSYSTEM
1313 
1314 /*
1315  * Local variables:
1316  * c-indentation-style: "K&R"
1317  * c-basic-offset: 8
1318  * tab-width: 8
1319  * fill-column: 80
1320  * scroll-step: 1
1321  * End:
1322  */
#define M0_UT_ENTER(...)
Definition: ut.h:56
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
static struct m0_mutex lock
Definition: transmit.c:326
static struct m0_rpc_stats saved
Definition: item.c:54
m0_time_t ri_resend_interval
Definition: item.h:144
M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:169
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static struct m0_semaphore wait
Definition: item.c:151
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static uint64_t test_item_cache_item_get_xid
Definition: item.c:916
#define M0_UT_LOG(...)
Definition: ut.h:57
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
const struct m0_sm_conf incoming_item_sm_conf
Definition: item.c:60
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
char * rc_local_conf
Definition: rconfc.h:390
#define NULL
Definition: misc.h:38
static void test_item_cache(void)
Definition: item.c:1003
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
m0_chan_cb_t rpc_conn_original_ha_cb
Definition: item.c:1042
static const struct m0_rpc_item_ops session_ut_item_ops
Definition: item.c:828
struct m0_semaphore arrow_hit
Definition: fops.c:41
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
Definition: sm.h:350
static void test_item_cache_add_nth(struct m0_rpc_item_cache *ic, struct m0_mutex *lock, struct m0_rpc_item *items, int items_nr, int n)
Definition: item.c:951
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
const struct m0_rpc_item_type_ops * rit_ops
Definition: item.h:476
M0_INTERNAL int m0_rpc_session_create(struct m0_rpc_session *session, struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: session.c:352
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
Definition: cache.c:106
static void test_item_cache_item_get(struct m0_rpc_item *item)
Definition: item.c:919
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_semaphore_timeddown(struct m0_semaphore *semaphore, const m0_time_t abs_timeout)
Definition: semaphore.c:75
#define M0_LOG(level,...)
Definition: trace.h:167
M0_INTERNAL int m0_file_read(const char *path, char **out)
Definition: fs.c:61
static void fop_alloc(struct m0_fom *fom, enum cob_fom_type fomtype)
Definition: cob_foms.c:624
uint64_t rs_nr_ha_timedout_items
Definition: rpc_machine.h:64
uint64_t rs_nr_ha_noted_conns
Definition: rpc_machine.h:65
static int ts_item_fini(void)
Definition: item.c:78
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
int m0_rpc_session_destroy(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:559
struct m0_sm ri_sm
Definition: item.h:181
struct m0_bufvec data
Definition: di.c:40
M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
Definition: item.c:1566
void disable_packet_ready_set_reply_error(int arg)
Definition: item.c:115
M0_INTERNAL void m0_rconfc_stop_sync(struct m0_rconfc *rconfc)
Definition: rconfc.c:2995
int32_t ri_error
Definition: item.h:161
static struct m0_rpc_client_ctx cctx
Definition: rconfc.c:69
struct m0_sm_conf rit_incoming_conf
Definition: item.h:483
#define M0_BITS(...)
Definition: misc.h:236
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
static void test_ha_cancel(void)
Definition: item.c:1093
static struct m0_rpc_item_type test_item_cache_itype
Definition: item.c:59
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
Definition: ut.h:77
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
void m0_rpc_item_init(struct m0_rpc_item *item, const struct m0_rpc_item_type *itype)
Definition: item.c:364
static struct m0_rpc_item * item
Definition: item.c:56
void fop_test(int expected_rc)
Definition: item.c:641
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
Definition: item.c:824
static struct foo * obj
Definition: tlist.c:302
m0_rpc_item_state
Definition: item.h:58
Definition: item.c:40
bool s_cancelled
Definition: session.h:360
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
static struct m0_rpc_session * session
Definition: item.c:53
struct m0_sm_group rm_sm_grp
Definition: rpc_machine.h:82
uint64_t s_xid
Definition: session.h:331
static void fop_release(struct m0_ref *ref)
Definition: item.c:548
#define equi(a, b)
Definition: misc.h:297
#define M0_ENTRY(...)
Definition: trace.h:170
void(* cho_ha_timer_cb)(struct m0_sm_timer *timer)
Definition: conn_internal.h:54
uint64_t osr_xid
Definition: onwire.h:105
M0_INTERNAL struct m0_fid * m0_reqh2profile(struct m0_reqh *reqh)
Definition: reqh.c:758
void m0_rpc_item_fini(struct m0_rpc_item *item)
Definition: item.c:394
static void test_resend(void)
Definition: item.c:290
void m0_rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
Definition: rpc_machine.c:592
int i
Definition: dir.c:1033
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
Definition: conn.h:281
static const struct m0_rpc_item_ops arrow_item_ops
Definition: item.c:543
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
struct m0_fop_type m0_rpc_arrow_fopt
Definition: fops.c:39
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
void(* rito_item_get)(struct m0_rpc_item *item)
Definition: item.h:423
Definition: cnt.h:36
void m0_rpc_item_cancel(struct m0_rpc_item *item)
Definition: item.c:932
const struct m0_rpc_item_ops cs_ds_req_fop_rpc_item_ops
Definition: cs_fop.c:47
M0_INTERNAL struct m0_confc * m0_reqh2confc(struct m0_reqh *reqh)
Definition: reqh.c:753
#define M0_FID_TINIT(type, container, key)
Definition: fid.h:90
Definition: refs.h:34
M0_INTERNAL void m0_rpc_test_fops_fini(void)
Definition: fops.c:69
M0_INTERNAL int m0_rpc_conn_ha_subscribe(struct m0_rpc_conn *conn, struct m0_fid *svc_fid)
Definition: conn.c:585
M0_INTERNAL void m0_fi_disable(const char *fp_func, const char *fp_tag)
Definition: finject.c:485
static void test_reply_item_error(void)
Definition: item.c:121
static void m0_fi_enable(const char *func, const char *tag)
Definition: finject.h:276
#define M0_ASSERT(cond)
static struct m0_confc * confc
Definition: file.c:94
static void m0_fi_enable_func(const char *func, const char *tag, m0_fi_fpoint_state_func_t trigger_func, void *data)
Definition: finject.h:387
struct m0_sm_conf rit_outgoing_conf
Definition: item.h:484
M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
Definition: item.c:1129
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
struct m0_thread thread
Definition: note.c:104
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
Definition: session.c:850
bool ALREADY_REPLIED
Definition: item.c:639
M0_INTERNAL void m0_rpc_item_cache_purge(struct m0_rpc_item_cache *ic)
Definition: item.c:1639
struct m0_rpc_conn rcx_connection
Definition: rpclib.h:146
uint64_t c_magic
Definition: conn.h:336
M0_INTERNAL void m0_rpc_item_cache_del(struct m0_rpc_item_cache *ic, uint64_t xid)
Definition: item.c:1616
m0_ha_obj_state
Definition: note.h:119
struct m0_reqh rc_reqh
Definition: setup.h:312
static bool __ha_service_event(struct m0_clink *link)
Definition: item.c:1082
static void stop_rpc_client_and_server(void)
Definition: note.c:126
struct m0_conf_cache cc_cache
Definition: confc.h:394
int m0_rpc_client_stop(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:217
int m0_rpc_client_start(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:160
struct m0_rpc_item * ri_reply
Definition: item.h:163
#define IS_INCR_BY_N(p, n)
Definition: item.c:63
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
static struct m0_rpc_server_ctx sctx
Definition: console.c:88
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static bool drop_twice(void *data)
Definition: item.c:256
int m0_rpc_post_sync(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ri_ops, m0_time_t deadline)
Definition: rpclib.c:284
static void test_oneway_item(void)
Definition: item.c:554
uint64_t ri_nr_sent_max
Definition: item.h:146
#define UINT64_MAX
Definition: types.h:44
Definition: reqh.h:94
struct m0_ut_suite item_ut
Definition: item.c:1292
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
Definition: fop.c:71
static void test_dropped(struct m0_rpc_item *item)
Definition: item.c:152
static uint64_t test_item_cache_item_put_xid
Definition: item.c:917
static void _test_timer_start_failure(void)
Definition: item.c:456
struct m0_rpc_conn conn
Definition: fsync.c:96
struct m0_mutex s_lock
Definition: sm.h:514
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
uint32_t fop_dispatched_nr
Definition: item.c:817
#define IS_INCR_BY_1(p)
Definition: item.c:64
bool REINITIALISE_AFTER_CANCEL
Definition: item.c:638
#define FID_P(f)
Definition: fid.h:77
struct m0_semaphore arrow_destroyed
Definition: fops.c:42
bool(* m0_chan_cb_t)(struct m0_clink *link)
Definition: chan.h:176
static void __ha_timer__dummy(struct m0_sm_timer *timer)
Definition: item.c:1067
static bool chk_state(const struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:85
static void test_ha_notify()
Definition: item.c:1169
M0_INTERNAL void m0_rconfc_fini(struct m0_rconfc *rconfc)
Definition: rconfc.c:3009
static void session_ut_item_cb(struct m0_rpc_item *item)
Definition: item.c:823
static uint32_t timeout
Definition: console.c:52
M0_INTERNAL void m0_rpc_item_cache_clear(struct m0_rpc_item_cache *ic)
Definition: item.c:1655
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
#define M0_UT_RETURN(...)
Definition: ut.h:58
m0_time_t rchc_ha_interval
Definition: conn_internal.h:63
const char * ts_name
Definition: ut.h:99
static void _ha_do_not_notify(struct m0_rpc_conn *conn, uint8_t state)
Definition: item.c:264
static struct m0_rpc_machine * machine
Definition: item.c:52
static void test_failure_before_sending(void)
Definition: item.c:473
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
Definition: refs.c:44
uint64_t n
Definition: fops.h:107
struct m0_reqh reqh
Definition: rm_foms.c:48
struct m0_rpc_conn_ha_ops rchc_ops
Definition: conn_internal.h:62
static bool only_second_time(void *data)
Definition: item.c:248
static void _test_resend(struct m0_fop *fop, bool post_sync)
Definition: item.c:425
M0_INTERNAL void m0_rpc_conn_ha_cfg_set(struct m0_rpc_conn *conn, const struct m0_rpc_conn_ha_cfg *cfg)
Definition: conn.c:1523
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
struct m0_fid c_svc_fid
Definition: conn.h:350
struct m0_reqh_context cc_reqh_ctx
Definition: setup.h:361
struct m0_rpc_session rcx_session
Definition: rpclib.h:147
static void test_timeout(void)
Definition: item.c:157
struct m0_ref f_ref
Definition: fop.h:80
Definition: fid.h:38
static void _test_timeout(m0_time_t deadline, m0_time_t timeout, bool reset)
Definition: item.c:228
struct m0_confc rc_confc
Definition: rconfc.h:235
M0_INTERNAL bool m0_rpc_item_cache_add(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item, m0_time_t deadline)
Definition: item.c:1583
static struct m0_rpc_item_type_ops test_item_cache_type_ops
Definition: item.c:938
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
Definition: conn_internal.h:58
static void test_cancel_session(void)
Definition: item.c:832
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
Definition: fop.c:148
static int _test(void)
Definition: item.c:519
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
static int ts_item_init(void)
Definition: item.c:66
Definition: beck.c:130
M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic, struct m0_mutex *lock)
Definition: item.c:1547
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
static void test_item_cache_item_put(struct m0_rpc_item *item)
Definition: item.c:927
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
struct m0_sm_timer c_ha_timer
Definition: conn.h:291
const struct m0_sm_conf outgoing_item_sm_conf
Definition: item.c:59
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_INTERNAL int m0_rconfc_start(struct m0_rconfc *rconfc)
Definition: rconfc.c:2928
static struct m0_thread ha_thread
Definition: item.c:1041
void __ha_accept_imitate(struct m0_fid *sfid)
Definition: item.c:1044
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
struct m0_clink c_ha_clink
Definition: conn.h:339
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
Definition: conn.c:349
static void check_cancel(bool already_replied, bool reinitialise)
Definition: item.c:667
static void m0_fi_enable_once(const char *func, const char *tag)
Definition: finject.h:301
M0_INTERNAL struct m0_rpc_item * m0_rpc_item_cache_lookup(struct m0_rpc_item_cache *ic, uint64_t xid)
Definition: item.c:1631
struct m0_rconfc rh_rconfc
Definition: reqh.h:166
static void cancel_item_with_various_states(bool reinitialise)
Definition: item.c:708
M0_INTERNAL void m0_rpc_conn_ha_unsubscribe(struct m0_rpc_conn *conn)
Definition: conn.c:632
M0_INTERNAL int m0_rconfc_init(struct m0_rconfc *rconfc, const struct m0_fid *profile, struct m0_sm_group *sm_group, struct m0_rpc_machine *rmach, m0_rconfc_cb_t expired_cb, m0_rconfc_cb_t ready_cb)
Definition: rconfc.c:2860
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
static void test_simple_transitions(void)
Definition: item.c:91
#define M0_UT_PATH(name)
Definition: misc.h:41
Definition: rcv_session.c:58
static enum m0_ha_obj_state expected_state
Definition: item.c:50
struct m0_rpc_stats rm_stats
Definition: rpc_machine.h:96
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
static void test_cancel_item(void)
Definition: item.c:802
uint32_t ri_nr_sent
Definition: item.h:183
static struct m0_fid expected_fid
Definition: item.c:51
M0_INTERNAL void m0_rpc_test_fops_init(void)
Definition: fops.c:54
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
Definition: item.c:813
struct m0_reqh rcx_reqh
Definition: rpclib.h:144
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
struct m0_rpc_item f_item
Definition: fop.h:83
uint32_t sm_state
Definition: sm.h:307
static int item_rc
Definition: item.c:58
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
static void start_rpc_client_and_server(void)
Definition: note.c:111
#define M0_UT_ASSERT(a)
Definition: ut.h:46
struct m0_rpc_conn * s_conn
Definition: session.h:312
static void arrow_sent_cb(struct m0_rpc_item *item)
Definition: item.c:539
struct m0_motr rsx_motr_ctx
Definition: rpclib.h:84
static void _ha_notify(struct m0_rpc_conn *conn, uint8_t state)
Definition: item.c:268
static void conn_flag_unset(struct m0_rpc_conn *conn, uint64_t flag)
Definition: conn_internal.h:76
void(* m0_rpc__item_dropped)(struct m0_rpc_item *item)
Definition: rpc_machine.c:868
Definition: fop.h:79
static bool fop_release_called
Definition: item.c:547
static bool arrow_sent_cb_called
Definition: item.c:538
#define FID_F
Definition: fid.h:75
void m0_rpc_item_cancel_init(struct m0_rpc_item *item)
Definition: item.c:947
m0_time_t ri_deadline
Definition: item.h:141
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)
Definition: ktime.c:73