Motr  M0
rpc.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 #include "lib/errno.h"
28 #include "lib/misc.h" /* M0_IN */
29 #include "lib/types.h"
30 #include "lib/finject.h"
31 
32 #include "rpc/link.h" /* m0_rpc_link_module_init */
33 #include "rpc/rpc.h"
34 #include "rpc/rpc_internal.h"
35 #include "rpc/service.h"
36 #include "net/lnet/lnet.h"
42 M0_INTERNAL int m0_rpc_init(void)
43 {
44  M0_ENTRY();
45  return M0_RC(m0_rpc_item_module_init() ?:
49 }
50 
51 M0_INTERNAL void m0_rpc_fini(void)
52 {
53  M0_ENTRY();
54 
59 
60  M0_LEAVE();
61 }
62 
63 M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
64 {
65  int rc;
66  struct m0_rpc_machine *machine;
67 
68  M0_ENTRY("%p[%s/%u]", item, item_kind(item),
72 
75 
79  return M0_RC(rc);
80 }
81 M0_EXPORTED(m0_rpc_post);
82 
83 M0_INTERNAL int m0_rpc__post_locked(struct m0_rpc_item *item)
84 {
85  struct m0_rpc_session *session;
86  int error;
87  M0_PRE(item != NULL && item->ri_type != NULL);
88 
89  M0_ENTRY("%p[%s/%u]",
92 
98  /*
99  * For requests an additional reference is taken, this reference is
100  * released after ->rio_replied() is called.
101  */
106 
112 
115  M0_LOG(M0_DEBUG, "%p[%s/%u], fop %p, session %p, "
116  "Session isn't established. Hence, not posting the item",
119  error = M0_ERR(-ENOTCONN);
120  } else if (session->s_session_id != SESSION_ID_0 &&
122  M0_LOG(M0_DEBUG, "%p[%s/%u], fop %p, session %p, "
123  "Session is cancelled. Hence, not posting the item",
126  error = M0_ERR(-ECANCELED);
127  } else {
129  return M0_RC(item->ri_error);
130  }
132  return error;
133 }
134 
135 void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
136 {
137  struct m0_rpc_machine *machine;
138 
139  M0_ENTRY("req_item: %p, rep_item: %p", request, reply);
140  M0_PRE(request != NULL && reply != NULL);
141  M0_PRE(request->ri_session != NULL);
142  M0_PRE(reply->ri_type != NULL);
146 
147  if (M0_FI_ENABLED("delay_reply")) {
148  M0_LOG(M0_DEBUG, "%p reply delayed", request);
150  200 * 1000 * 1000), NULL);
151  }
152 
153  reply->ri_resend_interval = M0_TIME_NEVER;
154  reply->ri_rpc_time = m0_time_now();
155  reply->ri_session = request->ri_session;
156  machine = reply->ri_rmachine = request->ri_rmachine;
157 
158  reply->ri_prio = request->ri_prio;
159  reply->ri_deadline = 0;
160  reply->ri_error = 0;
161 
164  m0_rpc_item_send_reply(request, reply);
166 }
167 M0_EXPORTED(m0_rpc_reply_post);
168 
169 M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn,
170  struct m0_rpc_item *item)
171 {
172  struct m0_rpc_machine *machine;
173 
174  M0_ENTRY("conn: %p, item: %p", conn, item);
175  M0_PRE(conn != NULL);
177 
182 }
183 
184 M0_INTERNAL void m0_rpc_oneway_item_post_locked(const struct m0_rpc_conn *conn,
185  struct m0_rpc_item *item)
186 {
187  M0_PRE(conn != NULL &&
190 
191  /*
192  * Rpc always acquires an *internal* reference to "all" items (Here
193  * one-way items). This reference is released when the item is sent.
194  */
199 
201  item->ri_nr_sent++;
203 }
204 
205 M0_INTERNAL int m0_rpc_reply_timedwait(struct m0_clink *clink,
206  const m0_time_t timeout)
207 {
208  int rc;
209  M0_ENTRY("timeout: "TIME_F, TIME_P(timeout));
210  M0_PRE(clink != NULL);
212 
213  rc = m0_chan_timedwait(clink, timeout) ? 0 : -ETIMEDOUT;
214  return M0_RC(rc);
215 }
216 M0_EXPORTED(m0_rpc_reply_timedwait);
217 
218 
220 {
221  /* Buffer pool is below threshold. */
222 }
223 
224 static const struct m0_net_buffer_pool_ops b_ops = {
226  .nbpo_below_threshold = rpc_buffer_pool_low,
227 };
228 
229 M0_INTERNAL int m0_rpc_net_buffer_pool_setup(struct m0_net_domain *ndom,
230  struct m0_net_buffer_pool
231  *app_pool, uint32_t bufs_nr,
232  uint32_t tm_nr)
233 {
234  int rc;
235  uint32_t segs_nr;
237 
238  M0_ENTRY("net_dom: %p", ndom);
239  M0_PRE(ndom != NULL);
240  M0_PRE(app_pool != NULL);
241  M0_PRE(bufs_nr != 0);
242 
244  segs_nr = m0_rpc_max_segs_nr(ndom);
245  app_pool->nbp_ops = &b_ops;
248  segs_nr, seg_size, tm_nr, M0_SEG_SHIFT,
249  false);
250  if (rc != 0)
251  return M0_ERR_INFO(rc, "net_buf_pool: Initialization");
252 
256 
257  if (rc == bufs_nr)
258  return M0_RC(0);
260  return M0_RC(-ENOMEM);
261 }
262 M0_EXPORTED(m0_rpc_net_buffer_pool_setup);
263 
265 {
266  M0_PRE(app_pool != NULL);
268 }
269 M0_EXPORTED(m0_rpc_net_buffer_pool_cleanup);
270 
271 M0_INTERNAL uint32_t m0_rpc_bufs_nr(uint32_t len, uint32_t tms_nr)
272 {
273  return len +
274  /* It is used so that more than one free buffer is present
275  * for each TM when tms_nr > 8.
276  */
277  max32u(tms_nr / 4, 1) +
278  /* It is added so that frequent low_threshold callbacks of
279  * buffer pool can be reduced.
280  */
282 }
283 
285 {
286  M0_PRE(ndom != NULL);
287  M0_PRE(ndom->nd_xprt != NULL);
289 
290  return ndom->nd_xprt->nx_ops->xo_rpc_max_seg_size(ndom);
291 }
292 
293 M0_INTERNAL uint32_t m0_rpc_max_segs_nr(struct m0_net_domain *ndom)
294 {
295  M0_PRE(ndom != NULL);
296  M0_PRE(ndom->nd_xprt != NULL);
298 
299  return ndom->nd_xprt->nx_ops->xo_rpc_max_segs_nr(ndom);
300 }
301 
303  m0_bcount_t rpc_size)
304 {
305  M0_PRE(ndom != NULL);
306  M0_PRE(ndom->nd_xprt != NULL);
308 
309  return ndom->nd_xprt->nx_ops->xo_rpc_max_msg_size(ndom, rpc_size);
310 }
311 
312 M0_INTERNAL uint32_t m0_rpc_max_recv_msgs(struct m0_net_domain *ndom,
313  m0_bcount_t rpc_size)
314 {
315  M0_PRE(ndom != NULL);
316  M0_PRE(ndom->nd_xprt != NULL);
318 
319  return ndom->nd_xprt->nx_ops->xo_rpc_max_recv_msgs(ndom, rpc_size);
320 }
321 
323 {
325 }
326 
328 #undef M0_TRACE_SUBSYSTEM
329 
330 /*
331  * Local variables:
332  * c-indentation-style: "K&R"
333  * c-basic-offset: 8
334  * tab-width: 8
335  * fill-column: 80
336  * scroll-step: 1
337  * End:
338  */
static m0_bcount_t seg_size
Definition: net.c:118
M0_INTERNAL m0_bcount_t m0_rpc_session_get_max_item_size(const struct m0_rpc_session *session)
Definition: session.c:767
M0_INTERNAL uint32_t m0_rpc_max_segs_nr(struct m0_net_domain *ndom)
Definition: rpc.c:293
const struct m0_net_xprt_ops * nx_ops
Definition: net.h:126
M0_INTERNAL void m0_net_buffer_pool_fini(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:154
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
uint32_t rit_opcode
Definition: item.h:474
m0_time_t ri_resend_interval
Definition: item.h:144
#define M0_PRE(cond)
M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:169
M0_INTERNAL bool m0_rpc_conn_is_rcv(const struct m0_rpc_conn *conn)
Definition: conn.c:288
static struct m0_net_buffer_pool app_pool
Definition: reqh_fom_ut.c:167
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
#define NULL
Definition: misc.h:38
M0_INTERNAL uint32_t m0_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: rpc.c:312
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
M0_INTERNAL int m0_rpc_item_module_init(void)
Definition: item.c:111
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:203
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
Definition: item.c:742
uint64_t osr_session_id
Definition: onwire.h:97
const struct m0_net_buffer_pool_ops * nbp_ops
Definition: buffer_pool.h:263
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
int32_t ri_error
Definition: item.h:161
static int error
Definition: mdstore.c:64
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL const char * item_kind(const struct m0_rpc_item *item)
Definition: item.c:356
M0_INTERNAL m0_time_t m0_rpc__down_timeout(void)
Definition: rpc.c:322
static struct m0_rpc_session session
Definition: formation2.c:38
#define TIME_P(t)
Definition: time.h:45
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL void m0_net_domain_buffer_pool_not_empty(struct m0_net_buffer_pool *pool)
Definition: tm_provision.c:484
struct m0_rpc_chan * c_rpcchan
Definition: conn.h:317
uint32_t(* xo_rpc_max_recv_msgs)(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.h:369
M0_INTERNAL bool m0_rpc_session_is_cancelled(struct m0_rpc_session *session)
Definition: session.c:875
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
Definition: item.c:509
M0_INTERNAL bool m0_rpc_session_invariant(const struct m0_rpc_session *session)
Definition: session.c:155
return M0_RC(rc)
m0_time_t ri_rpc_time
Definition: item.h:202
#define M0_ASSERT_EX(cond)
M0_INTERNAL void m0_rpc_item_send_reply(struct m0_rpc_item *req, struct m0_rpc_item *reply)
Definition: item.c:1503
#define M0_ENTRY(...)
Definition: trace.h:170
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
Definition: item.c:470
M0_INTERNAL void m0_rpc_session_module_fini(void)
Definition: session_utils.c:52
uint64_t osr_sender_id
Definition: onwire.h:96
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:565
#define TIME_F
Definition: time.h:44
M0_INTERNAL int m0_rpc_reply_timedwait(struct m0_clink *clink, const m0_time_t timeout)
Definition: rpc.c:205
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
uint64_t c_sender_id
Definition: conn.h:269
M0_INTERNAL void m0_rpc_item_module_fini(void)
Definition: item.c:137
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
Definition: item.c:704
#define M0_ASSERT(cond)
const struct m0_net_xprt * nd_xprt
Definition: net.h:396
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
Definition: item.c:523
M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
Definition: item.c:1129
m0_time_t m0_time_now(void)
Definition: time.c:134
M0_INTERNAL m0_bcount_t m0_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: rpc.c:302
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
M0_INTERNAL int m0_rpc_net_buffer_pool_setup(struct m0_net_domain *ndom, struct m0_net_buffer_pool *app_pool, uint32_t bufs_nr, uint32_t tm_nr)
Definition: rpc.c:229
uint32_t rm_min_recv_size
Definition: rpc_machine.h:148
M0_INTERNAL uint32_t m0_rpc_bufs_nr(uint32_t len, uint32_t tms_nr)
Definition: rpc.c:271
void(* nbpo_not_empty)(struct m0_net_buffer_pool *)
Definition: buffer_pool.h:150
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:186
M0_INTERNAL int session_state(const struct m0_rpc_session *session)
Definition: session.c:141
static const struct m0_net_buffer_pool_ops b_ops
Definition: rpc.c:224
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
struct m0_uint128 osr_uuid
Definition: onwire.h:95
struct m0_rpc_frm rc_frm
M0_INTERNAL struct m0_rpc_machine * session_machine(const struct m0_rpc_session *s)
Definition: session.c:147
uint64_t ri_cookid
Definition: item.h:190
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
Definition: rpc.c:135
struct m0_rpc_conn conn
Definition: fsync.c:96
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_rpc_machine machine
Definition: mdstore.c:58
M0_INTERNAL int m0_rpc_session_module_init(void)
Definition: session_utils.c:47
struct m0_cookie osr_cookie
Definition: onwire.h:106
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
Definition: formation2.c:260
struct m0_uint128 c_uuid
Definition: conn.h:272
static uint32_t timeout
Definition: console.c:52
M0_INTERNAL bool m0_rpc_machine_is_not_locked(const struct m0_rpc_machine *machine)
Definition: rpc_machine.c:573
M0_INTERNAL void m0_rpc_service_unregister(void)
Definition: service.c:134
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL int m0_rpc_init(void)
Definition: rpc.c:42
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
Definition: chan.c:349
static void rpc_buffer_pool_low(struct m0_net_buffer_pool *bp)
Definition: rpc.c:219
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
static uint32_t max32u(uint32_t a, uint32_t b)
Definition: arith.h:61
struct m0_rpc_session * ri_session
Definition: item.h:147
m0_bcount_t(* xo_rpc_max_seg_size)(struct m0_net_domain *ndom)
Definition: net.h:362
M0_INTERNAL int m0_net_buffer_pool_provision(struct m0_net_buffer_pool *pool, uint32_t buf_nr)
Definition: buffer_pool.c:125
m0_bcount_t(* xo_rpc_max_msg_size)(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.h:366
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL void m0_rpc_fini(void)
Definition: rpc.c:51
static struct bulkio_params * bp
Definition: bulkio_ut.c:44
void m0_rpc_net_buffer_pool_cleanup(struct m0_net_buffer_pool *app_pool)
Definition: rpc.c:264
M0_INTERNAL void m0_rpc_oneway_item_post_locked(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:184
uint32_t ri_nr_sent
Definition: item.h:183
M0_INTERNAL bool m0_rpc_conn_is_snd(const struct m0_rpc_conn *conn)
Definition: conn.c:280
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
uint64_t s_session_id
Definition: session.h:309
M0_INTERNAL m0_bcount_t m0_rpc_max_seg_size(struct m0_net_domain *ndom)
Definition: rpc.c:284
M0_INTERNAL int m0_net_buffer_pool_init(struct m0_net_buffer_pool *pool, struct m0_net_domain *ndom, uint32_t threshold, uint32_t seg_nr, m0_bcount_t seg_size, uint32_t colours, unsigned shift, bool dont_dump)
Definition: buffer_pool.c:82
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_rpc_service_register(void)
Definition: service.c:128
M0_INTERNAL int m0_rpc__post_locked(struct m0_rpc_item *item)
Definition: rpc.c:83
struct m0_rpc_conn * s_conn
Definition: session.h:312
static struct m0_rpc_conn * item2conn(const struct m0_rpc_item *item)
Definition: rpc_internal.h:95
uint32_t(* xo_rpc_max_segs_nr)(struct m0_net_domain *ndom)
Definition: net.h:364
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)
Definition: ktime.c:73