Motr  M0
item_source.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #include "ut/ut.h"
31 #include "lib/memory.h"
32 #include "lib/finject.h"
33 #include "lib/misc.h" /* M0_BITS */
34 #include "lib/time.h" /* m0_nanosleep */
35 #include "rpc/rpc.h"
36 #include "rpc/rpc_internal.h"
37 #include "rpc/ut/clnt_srv_ctx.c"
38 #include "rpc/ut/fops.h" /* m0_rpc_arrow_fopt */
39 #include "ut/cs_fop.h" /* cs_ds2_req_fop_fopt */
40 #include "ut/cs_fop_xc.h" /* cs_ds2_req_fop */
41 #include "rpc/formation2.c" /* frm_fill_packet_from_item_sources */
42 
43 #include <stdio.h>
44 
45 static struct m0_rpc_conn *conn;
46 static struct m0_rpc_item *item;
47 static int has_item_calls;
48 static int get_item_calls;
50 
52 {
54  has_item_calls = 0;
55  get_item_calls = 0;
56  conn = NULL;
57  item = NULL;
58 
62  return 0;
63 }
64 
66 {
67  /* rpc client-server will be stopped in conn_terminating_cb_test() */
68  if (conn_terminating_cb_called == false)
71  return 0;
72 }
73 
74 static bool has_item(const struct m0_rpc_item_source *ris)
75 {
77 
79  return M0_FI_ENABLED("yes");
80 }
81 
82 static struct m0_rpc_item *get_item(struct m0_rpc_item_source *ris,
83  size_t max_payload_size)
84 {
85  struct m0_fop *fop;
87 
91  M0_UT_ASSERT(fop != NULL);
92  item = &fop->f_item;
93  /* without this "get", the item will be freed as soon as it is
94  sent/failed. The reference is required to protect item until
95  item_source_test() performs its checks on the item.
96  */
98 
99  if (M0_FI_ENABLED("max"))
101  &cctx.rcx_session);
102 
103  if (M0_FI_ENABLED("not_multiple_of_8bytes"))
104  item->ri_size = max_payload_size - 1;
105 
107  m0_rpc_item_size(item) <= max_payload_size);
108  return item;
109 }
110 
111 static void conn_terminating(struct m0_rpc_item_source *ris)
112 {
114 
117  m0_free(ris);
118 }
119 
120 static const struct m0_rpc_item_source_ops ris_ops = {
122  .riso_get_item = get_item,
123  .riso_conn_terminating = conn_terminating,
124 };
125 
126 static void item_source_basic_test(void)
127 {
128  struct m0_rpc_item_source ris;
129 
130  m0_rpc_item_source_init(&ris, "test-item-source", &ris_ops);
131  M0_UT_ASSERT(ris.ris_ops == &ris_ops);
135 }
136 
137 static void item_source_limits_test(void)
138 {
139  struct m0_rpc_item_source ris;
140  struct m0_rpc_frm *frm;
141  struct m0_rpc_packet *p;
142  int cond;
143 
144  m0_rpc_item_source_init(&ris, "test-item-source", &ris_ops);
145  M0_UT_ASSERT(ris.ris_ops == &ris_ops);
147  frm = &conn->c_rpcchan->rc_frm;
148 
149  for (cond = 0; cond < 3; cond++) {
150  M0_ALLOC_PTR(p);
151  M0_UT_ASSERT(p != NULL);
153  m0_fi_enable_once("has_item", "yes");
154  switch (cond) {
155  case 0:
156  /* For the minimum item size */
157  break;
158  case 1:
159  m0_fi_enable_once("get_item", "max");
160  break;
161  case 2:
162  m0_fi_enable_once("get_item", "not_multiple_of_8bytes");
163  break;
164  default:
165  M0_IMPOSSIBLE("not supported");
166  }
167 
173  get_item_calls == 1);
181  }
184 }
185 
186 static void item_source_test(void)
187 {
188  struct m0_rpc_item_source *ris;
189  int trigger;
190  int rc;
191 
192  /*
193  Test:
194  - Confirm that formation correctly pulls items and sends them.
195  - Also verify that periodic item-source drain works.
196  */
197  M0_ALLOC_PTR(ris);
198  M0_UT_ASSERT(ris != NULL);
199  m0_rpc_item_source_init(ris, "test-item-source", &ris_ops);
201 
202  for (trigger = 0; trigger < 2; trigger++) {
203  m0_fi_enable_once("has_item", "yes");
204  m0_fi_enable("frm_is_ready", "ready");
206  switch (trigger) {
207  case 0:
211  break;
212  case 1:
215  128);
217  break;
218  default:
219  M0_IMPOSSIBLE("only two triggers");
220  }
222  get_item_calls == 1);
225  m0_time_from_now(2, 0));
226  M0_UT_ASSERT(rc == 0);
228 
229  /* riso_has_item() is set to return false.
230  Test that get_item does not get called when has_item
231  returns false
232  */
238  get_item_calls == 0);
239 
240  m0_fi_disable("frm_is_ready", "ready");
244  }
247  m0_free(ris);
248 }
249 
250 static void conn_terminating_cb_test(void)
251 {
252  struct m0_rpc_item_source *ris;
253 
254  M0_ALLOC_PTR(ris);
255  M0_UT_ASSERT(ris != NULL);
256  m0_rpc_item_source_init(ris, "test-item-source", &ris_ops);
258 
261  /* riso_conn_terminating() callback will be called on item-sources
262  which were still registered when rpc-conn was being terminated
263  */
265 }
266 
268  .ts_name = "rpc-item-source-ut",
269  .ts_init = item_source_test_suite_init,
270  .ts_fini = item_source_test_suite_fini,
271  .ts_tests = {
272  { "basic", item_source_basic_test },
273  { "item_source_limits", item_source_limits_test },
274  { "item_pull", item_source_test },
275  { "conn_terminating_cb_test", conn_terminating_cb_test },
276  { NULL, NULL },
277  }
278 };
279 
283 /*
284  * Local variables:
285  * c-indentation-style: "K&R"
286  * c-basic-offset: 8
287  * tab-width: 8
288  * fill-column: 80
289  * scroll-step: 1
290  * End:
291  */
292 /*
293  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
294  */
void m0_rpc_item_source_fini(struct m0_rpc_item_source *ris)
Definition: item_source.c:70
void m0_rpc_item_source_register(struct m0_rpc_conn *conn, struct m0_rpc_item_source *ris)
Definition: item_source.c:95
static struct m0_addb2_philter p
Definition: consumer.c:40
const struct m0_rpc_item_source_ops * ris_ops
Definition: item_source.h:63
struct m0_rpc_conn * ris_conn
Definition: item_source.h:64
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
Definition: item.c:728
#define NULL
Definition: misc.h:38
static void conn_terminating(struct m0_rpc_item_source *ris)
Definition: item_source.c:111
static bool has_item(const struct m0_rpc_item_source *ris)
Definition: item_source.c:74
void m0_rpc_item_source_deregister(struct m0_rpc_item_source *ris)
Definition: item_source.c:107
size_t ri_size
Definition: item.h:198
void m0_rpc_item_put(struct m0_rpc_item *item)
Definition: item.c:443
static struct m0_rpc_item * item
Definition: item_source.c:46
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
struct m0_sm ri_sm
Definition: item.h:181
void m0_rpc_item_source_init(struct m0_rpc_item_source *ris, const char *name, const struct m0_rpc_item_source_ops *ops)
Definition: item_source.c:52
static int item_source_test_suite_fini(void)
Definition: item_source.c:65
static struct m0_rpc_client_ctx cctx
Definition: rconfc.c:69
struct m0_ut_suite item_source_ut
Definition: item_source.c:267
#define M0_BITS(...)
Definition: misc.h:236
Definition: ut.h:77
struct m0_rpc_chan * c_rpcchan
Definition: conn.h:317
static void item_source_limits_test(void)
Definition: item_source.c:137
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
Definition: item.c:470
M0_INTERNAL void m0_rpc_packet_init(struct m0_rpc_packet *p, struct m0_rpc_machine *rmach)
Definition: packet.c:103
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_payload_size(const struct m0_rpc_session *session)
Definition: session.c:775
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
static void frm_fill_packet_from_item_sources(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
Definition: formation2.c:570
static int has_item_calls
Definition: item_source.c:47
struct m0_fop_type m0_rpc_arrow_fopt
Definition: fops.c:39
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
M0_INTERNAL struct m0_rpc_machine * frm_rmachine(const struct m0_rpc_frm *frm)
Definition: formation2.c:161
M0_INTERNAL void m0_rpc_test_fops_fini(void)
Definition: fops.c:69
M0_INTERNAL void m0_fi_disable(const char *fp_func, const char *fp_tag)
Definition: finject.c:485
static void m0_fi_enable(const char *func, const char *tag)
Definition: finject.h:276
static void item_source_test(void)
Definition: item_source.c:186
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
struct m0_rpc_conn rcx_connection
Definition: rpclib.h:146
static void stop_rpc_client_and_server(void)
Definition: note.c:126
static const struct m0_rpc_item_source_ops ris_ops
Definition: item_source.c:120
struct m0_rpc_frm rc_frm
M0_INTERNAL void m0_rpc_frm_run_formation(struct m0_rpc_frm *frm)
Definition: formation2.c:684
struct m0_rpc_machine machine
Definition: mdstore.c:58
static struct m0_rpc_item * get_item(struct m0_rpc_item_source *ris, size_t max_payload_size)
Definition: item_source.c:82
const char * ts_name
Definition: ut.h:99
static struct m0_rpc_frm * frm
Definition: formation2.c:34
static bool conn_terminating_cb_called
Definition: item_source.c:49
struct m0_rpc_session rcx_session
Definition: rpclib.h:147
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL void m0_rpc_machine_drain_item_sources(struct m0_rpc_machine *machine, uint32_t max_per_source)
Definition: rpc_machine.c:404
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL void m0_rpc_packet_discard(struct m0_rpc_packet *packet)
Definition: packet.c:134
static struct m0_fop * fop
Definition: item.c:57
static void m0_fi_enable_once(const char *func, const char *tag)
Definition: finject.h:301
static int item_source_test_suite_init(void)
Definition: item_source.c:51
static struct m0_rpc_conn * conn
Definition: item_source.c:45
M0_INTERNAL void m0_rpc_test_fops_init(void)
Definition: fops.c:54
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
Definition: item.c:813
void m0_free(void *data)
Definition: memory.c:146
struct m0_rpc_item f_item
Definition: fop.h:83
static void conn_terminating_cb_test(void)
Definition: item_source.c:250
uint32_t sm_state
Definition: sm.h:307
bool m0_rpc_item_source_is_registered(const struct m0_rpc_item_source *ris)
Definition: item_source.c:77
int32_t rc
Definition: trigger_fop.h:47
static void start_rpc_client_and_server(void)
Definition: note.c:111
#define M0_UT_ASSERT(a)
Definition: ut.h:46
Definition: fop.h:79
bool(* riso_has_item)(const struct m0_rpc_item_source *ris)
Definition: item_source.h:85
static void item_source_basic_test(void)
Definition: item_source.c:126
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
#define M0_IMPOSSIBLE(fmt,...)
static int get_item_calls
Definition: item_source.c:48