Motr  M0
ub.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
24 #include "lib/trace.h"
25 
26 #include "lib/ub.h" /* m0_ub_set */
27 #include "lib/misc.h" /* M0_IN, M0_BITS */
28 #include "lib/string.h" /* strlen, m0_strdup */
29 #include "lib/memory.h" /* m0_free */
30 #include "fop/fop.h" /* m0_fop_alloc */
31 #include "net/bulk_mem.h" /* m0_net_bulk_mem_xprt */
32 #include "net/lnet/lnet.h" /* m0_net_lnet_xprt */
33 #include "ut/cs_service.h" /* m0_cs_default_stypes */
34 #include "ut/misc.h" /* M0_UT_PATH */
35 #include "rpc/rpclib.h" /* m0_rpc_server_ctx, m0_rpc_client_ctx */
36 #include "rpc/session.h" /* m0_rpc_session_timedwait */
37 #include "rpc/ub/fops.h"
38 
39 /* ----------------------------------------------------------------
40  * CLI arguments
41  * ---------------------------------------------------------------- */
42 
43 /* X(name, defval, max) */
44 #define ARGS \
45  X(nr_conns, 2, 10000) \
46  X(nr_msgs, 1000, 5000) \
47  X(msg_len, 32, 8192)
48 
49 struct args {
50 #define X(name, defval, max) unsigned int a_ ## name;
51  ARGS
52 #undef X
53 };
54 
55 static struct args g_args;
56 
58 static void args_init(struct args *args)
59 {
60 #define X(name, defval, max) args->a_ ## name = defval;
61  ARGS
62 #undef X
63 }
64 
65 static int args_check_limits(const struct args *args)
66 {
67 #define X(name, defval, max) \
68  && 0 < args->a_ ## name && args->a_ ## name <= max
69 
70  if (true ARGS)
71  return 0;
72 #undef X
73 
74  fprintf(stderr, "Value is out of bounds\n");
75  return -EINVAL;
76 }
77 
78 struct match {
79  const char *m_pattern;
80  unsigned int *m_dest;
81 };
82 
83 static bool token_matches(const char *token, const struct match *tbl)
84 {
85  for (; tbl->m_pattern != NULL; ++tbl) {
86  int rc = sscanf(token, tbl->m_pattern, tbl->m_dest);
87  if (rc == 1)
88  return true;
89  }
90  return false;
91 }
92 
93 static void args_help(void)
94 {
95  fprintf(stderr, "Expecting a comma-separated list of parameter"
96  " specifications:\n");
97 #define X(name, defval, max) \
98  fprintf(stderr, " %s=NUM\t(default = %u, ulimit = %u)\n", \
99  #name, defval, max);
100  ARGS
101 #undef X
102 }
103 
104 static int args_parse(const char *src, struct args *dest)
105 {
106  if (src == NULL || *src == 0)
107  return 0;
108 
109  char *s;
110  char *token;
111  const struct match match_tbl[] = {
112 #define X(name, defval, max) { #name "=%u", &dest->a_ ## name },
113  ARGS
114 #undef X
115  { NULL, NULL }
116  };
117 
118  s = strdupa(src);
119  if (s == NULL)
120  return -ENOMEM;
121 
122  while ((token = strsep(&s, ",")) != NULL) {
123  if (!token_matches(token, match_tbl)) {
124  fprintf(stderr, "Unable to parse `%s'\n", token);
125  args_help();
126  return -EINVAL;
127  }
128  }
129  return args_check_limits(dest);
130 }
131 
132 #undef ARGS
133 
134 /* ----------------------------------------------------------------
135  * RPC client and server definitions
136  * ---------------------------------------------------------------- */
137 
138 enum {
140  MAX_RPCS_IN_FLIGHT = 10, /* XXX CONFIGUREME */
141  MAX_RETRIES = 500,
143 };
144 
145 /* #define UB_USE_LNET_XPORT */
146 #ifdef UB_USE_LNET_XPORT
147 # define CLIENT_ENDPOINT_FMT "0@lo:12345:34:%d"
148 # define SERVER_ENDPOINT_ADDR "0@lo:12345:32:1"
149 # define SERVER_ENDPOINT M0_NET_XPRT_PREFIX_DEFAULT":"SERVER_ENDPOINT_ADDR
150 #ifdef ENABLE_LIBFAB
151 static struct m0_net_xprt *g_xprt = &m0_net_libfab_xprt;
152 #else
153 static struct m0_net_xprt *g_xprt = &m0_net_lnet_xprt;
154 #endif
155 #else
156 # define CLIENT_ENDPOINT_FMT "127.0.0.1:%d"
157 # define SERVER_ENDPOINT_ADDR "127.0.0.1:1"
158 # define SERVER_ENDPOINT "bulk-mem:" SERVER_ENDPOINT_ADDR
160 #endif
161 
166 };
167 
168 static struct ub_rpc_client *g_clients;
169 
171 
172 #define NAME(ext) "rpc-ub" ext
173 static char *g_argv[] = {
174  NAME(""), "-Q", "200" /* MIN_RECV_QUEUE_LEN */, "-w", "10",
175  "-T", "AD", "-D", NAME(".db"), "-S", NAME(".stob"),
176  "-A", "linuxstob:"NAME(".addb-stob"),
178  "-f", M0_UT_CONF_PROCESS,
179  "-c", M0_UT_PATH("conf.xc")
180 };
181 
182 static struct m0_rpc_server_ctx g_sctx = {
183  .rsx_xprts = &g_xprt,
184  .rsx_xprts_nr = 1,
185  .rsx_argv = g_argv,
186  .rsx_argc = ARRAY_SIZE(g_argv),
187  .rsx_log_file_name = NAME(".log")
188 };
189 #undef NAME
190 
191 static void ub_item_replied(struct m0_rpc_item *item);
192 
193 static const struct m0_rpc_item_ops ub_item_ops = {
195 };
196 
197 /* ----------------------------------------------------------------
198  * RPC client and server operations
199  * ---------------------------------------------------------------- */
200 
201 static void _client_start(struct ub_rpc_client *client, uint32_t cob_dom_id,
202  const char *ep)
203 {
204  int rc;
205  struct m0_fid process_fid = M0_FID_TINIT('r', 2, 1);
206 
207  rc = m0_net_domain_init(&client->rc_net_dom, g_xprt);
208  M0_ASSERT(rc == 0);
209 
210  client->rc_ctx = (struct m0_rpc_client_ctx){
211  .rcx_net_dom = &client->rc_net_dom,
212  .rcx_local_addr = m0_strdup(ep),
213  .rcx_remote_addr = SERVER_ENDPOINT_ADDR,
214  .rcx_max_rpcs_in_flight = MAX_RPCS_IN_FLIGHT,
215  .rcx_recv_queue_min_length = MIN_RECV_QUEUE_LEN,
216  .rcx_fid = &process_fid,
217  };
218  rc = m0_rpc_client_start(&client->rc_ctx);
219  if (rc != 0)
220  M0_LOG(M0_FATAL, "rc=%d", rc);
221  M0_ASSERT(rc == 0);
222 }
223 
224 static void _client_stop(struct ub_rpc_client *client)
225 {
226  int rc;
227 
228  rc = m0_rpc_client_stop(&client->rc_ctx);
229  M0_ASSERT(rc == 0);
230  m0_free((void *)client->rc_ctx.rcx_local_addr);
231  m0_net_domain_fini(&client->rc_net_dom);
232 }
233 
234 static int _start(const char *opts)
235 {
236  int i;
237  int rc;
238  char ep[40];
239 
240  args_init(&g_args);
242  if (rc != 0)
243  return rc;
244 
245  M0_ALLOC_ARR(g_clients, g_args.a_nr_conns);
246  if (g_clients == NULL) {
247  rc = -ENOMEM;
248  goto err;
249  }
250 
251  for (i = 0; i < g_args.a_nr_conns; ++i) {
252  snprintf(ep, sizeof ep, CLIENT_ENDPOINT_FMT,
253  2 + i); /* 1 is server EP, so we start from 2 */
255  }
256 
258  return 0;
259 err:
261  return rc;
262 }
263 
264 static void _stop(void)
265 {
266  int i;
267 
268  for (i = 0; i < g_args.a_nr_conns; ++i)
273 }
274 
275 static void ub_item_replied(struct m0_rpc_item *item)
276 {
277  struct ub_resp *resp;
278  struct ub_req *req;
279  int32_t err;
280 
281  err = m0_rpc_item_error(item);
282  if (err != 0)
283  M0_LOG(M0_FATAL, "err=%d", err);
284  M0_UB_ASSERT(err == 0);
285 
288  M0_UB_ASSERT(resp->ur_seqn == req->uq_seqn);
289  M0_UB_ASSERT(m0_buf_eq(&resp->ur_data, &req->uq_data));
290 }
291 
292 static void fop_send(struct m0_rpc_session *session, size_t msg_id)
293 {
294  struct m0_fop *fop;
295  struct ub_req *req;
296  struct m0_rpc_item *item;
297  char *data;
298  int rc;
299 
300  M0_PRE(g_args.a_msg_len > 0);
301 
303  M0_UB_ASSERT(fop != NULL);
304 
305  req = m0_fop_data(fop);
306  req->uq_seqn = msg_id;
307  M0_ALLOC_ARR(data, g_args.a_msg_len);
308  M0_UB_ASSERT(data != NULL);
309  /* `data' will be freed by rpc layer (see m0_fop_fini()) */
310  m0_buf_init(&req->uq_data, data, g_args.a_msg_len);
311 
312  item = &fop->f_item;
314  item->ri_ops = &ub_item_ops;
317  item->ri_prio = M0_RPC_ITEM_PRIO_MID; /* XXX CONFIGUREME */
318 
319  rc = m0_rpc_post(item);
320  M0_UB_ASSERT(rc == 0);
322 }
323 
324 /* ----------------------------------------------------------------
325  * Benchmark
326  * ---------------------------------------------------------------- */
327 
328 static struct m0_rpc_session *_session(unsigned int i)
329 {
330  M0_PRE(i < g_args.a_nr_conns);
331  return &g_clients[i].rc_ctx.rcx_session;
332 }
333 
334 static void run(int iter M0_UNUSED)
335 {
336  int n;
337  int k;
338  int rc;
339 
340  M0_PRE(g_args.a_nr_msgs > 0 && g_args.a_nr_conns > 0);
341 
342  /* @todo: For some reason the following error may occur here:
343  motr: NOTICE : [rpc/slot.c:584:m0_rpc_slot_reply_received] < rc=-71.
344  Needs investigation!
345  */
346  for (n = 0; n < g_args.a_nr_msgs; ++n) {
347  for (k = 0; k < g_args.a_nr_conns; ++k)
348  fop_send(_session(k), n);
349  }
350 
351  for (k = 0; k < g_args.a_nr_conns; ++k) {
355  M0_TIME_NEVER);
356  M0_UB_ASSERT(rc == 0);
357  }
358 }
359 
361  .us_name = "rpc-ub",
362  .us_init = _start,
363  .us_fini = _stop,
364  .us_run = {
365  { .ub_name = "run",
366  .ub_iter = 1,
367  .ub_round = run },
368  { .ub_name = NULL } /* terminator */
369  }
370 };
371 
372 #undef M0_TRACE_SUBSYSTEM
struct m0_rpc_client_ctx rc_ctx
Definition: ub.c:163
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static void ub_item_replied(struct m0_rpc_item *item)
Definition: ub.c:275
void m0_net_domain_fini(struct m0_net_domain *dom)
Definition: domain.c:71
static struct args g_args
Definition: ub.c:55
#define m0_strdup(s)
Definition: string.h:43
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
#define ARGS
Definition: ub.c:44
#define NULL
Definition: misc.h:38
static void _client_stop(struct ub_rpc_client *client)
Definition: ub.c:224
#define M0_UB_ASSERT(cond)
Definition: ub.h:37
M0_INTERNAL void m0_rpc_ub_fops_init(void)
Definition: fops.c:114
const struct m0_net_xprt m0_net_lnet_xprt
Definition: lnet_xo.c:679
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
Definition: ub.c:78
static struct io_request req
Definition: file.c:100
M0_INTERNAL bool m0_buf_eq(const struct m0_buf *x, const struct m0_buf *y)
Definition: buf.c:90
static const struct m0_rpc_item_ops ub_item_ops
Definition: ub.c:193
#define M0_LOG(level,...)
Definition: trace.h:167
#define NAME(ext)
Definition: ub.c:172
int m0_rpc_server_start(struct m0_rpc_server_ctx *sctx)
Definition: rpclib.c:50
static void fop_send(struct m0_rpc_session *session, size_t msg_id)
Definition: ub.c:292
Definition: fops.h:38
M0_INTERNAL void m0_buf_init(struct m0_buf *buf, void *data, uint32_t nob)
Definition: buf.c:37
struct m0_bufvec data
Definition: di.c:40
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
#define M0_BITS(...)
Definition: misc.h:236
static struct m0_rpc_session session
Definition: formation2.c:38
struct m0_ub_set m0_rpc_ub
Definition: ub.c:360
Definition: ub.c:49
const char * m_pattern
Definition: ub.c:79
static void args_help(void)
Definition: ub.c:93
static void _stop(void)
Definition: ub.c:264
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_cob_domain rc_cob_dom
Definition: ub.c:165
static struct m0_rpc_session * _session(unsigned int i)
Definition: ub.c:328
Definition: fops.h:32
struct m0_net_domain rc_net_dom
Definition: ub.c:164
Definition: sock.c:754
int i
Definition: dir.c:1033
static int _start(const char *opts)
Definition: ub.c:234
#define M0_FID_TINIT(type, container, key)
Definition: fid.h:90
#define M0_ASSERT(cond)
const char * us_name
Definition: ub.h:76
static struct m0_rpc_server_ctx g_sctx
Definition: ub.c:182
struct m0_buf ur_data
Definition: fops.h:41
Definition: client.h:37
int m0_rpc_client_stop(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:217
int m0_rpc_client_start(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:160
struct m0_net_xprt ** rsx_xprts
Definition: rpclib.h:69
struct m0_rpc_item * ri_reply
Definition: item.h:163
#define SERVER_ENDPOINT_ADDR
Definition: ub.c:157
M0_INTERNAL int m0_rpc_session_timedwait(struct m0_rpc_session *session, uint64_t states, const m0_time_t abs_timeout)
Definition: session.c:332
uint64_t ri_nr_sent_max
Definition: item.h:146
#define CLIENT_ENDPOINT_FMT
Definition: ub.c:156
static void token(struct ff2c_context *ctx, struct ff2c_term *term, struct ff2c_token *tok)
Definition: parser.c:66
uint64_t ur_seqn
Definition: fops.h:40
static void args_init(struct args *args)
Definition: ub.c:58
#define SERVER_ENDPOINT
Definition: ub.c:158
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
M0_BASSERT(MIN_RECV_QUEUE_LEN==200)
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
uint64_t n
Definition: fops.h:107
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
Definition: domain.c:36
struct m0_rpc_session rcx_session
Definition: rpclib.h:147
Definition: fid.h:38
static int args_parse(const char *src, struct args *dest)
Definition: ub.c:104
static int args_check_limits(const struct args *args)
Definition: ub.c:65
static char * g_argv[]
Definition: ub.c:173
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
#define M0_UT_CONF_PROCESS
Definition: misc.h:45
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
struct m0_rpc_session * ri_session
Definition: item.h:147
struct m0_fop * m0_fop_alloc_at(struct m0_rpc_session *sess, struct m0_fop_type *fopt)
Definition: fop.c:122
struct m0_fop_type m0_rpc_ub_req_fopt
Definition: fops.c:33
static struct ub_rpc_client * g_clients
Definition: ub.c:168
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
static bool token_matches(const char *token, const struct match *tbl)
Definition: ub.c:83
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
unsigned int * m_dest
Definition: ub.c:80
void m0_rpc_server_stop(struct m0_rpc_server_ctx *sctx)
Definition: rpclib.c:85
static void _client_start(struct ub_rpc_client *client, uint32_t cob_dom_id, const char *ep)
Definition: ub.c:201
M0_INTERNAL void m0_rpc_ub_fops_fini(void)
Definition: fops.c:133
#define M0_UT_PATH(name)
Definition: misc.h:41
static struct m0_net_xprt * g_xprt
Definition: ub.c:159
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
static void run(int iter M0_UNUSED)
Definition: ub.c:334
struct m0_rpc_item f_item
Definition: fop.h:83
struct m0_pdclust_src_addr src
Definition: fd.c:108
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
Definition: ub.h:74
static const char * process_fid
Definition: idx_dix.c:73
Definition: fop.h:79
const struct m0_net_xprt m0_net_bulk_mem_xprt
Definition: mem_xprt_xo.c:761
m0_time_t ri_deadline
Definition: item.h:141
#define M0_UNUSED
Definition: misc.h:380