Motr  M0
rpclib.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #ifndef __KERNEL__
27 # include <errno.h> /* errno */
28 # include <stdio.h> /* fopen, fclose */
29 #endif
30 
31 #include "lib/misc.h"
32 #include "lib/types.h"
33 #include "lib/memory.h"
34 #include "lib/assert.h"
35 #include "rpc/rpc.h"
36 #include "net/net.h"
37 #include "net/lnet/lnet.h"
38 #include "fop/fop.h"
39 #include "fop/fom_generic.h" /* m0_rpc_item_generic_reply_rc */
40 #include "rpc/rpclib.h"
41 #include "conf/helpers.h" /* m0_conf_service_locate */
42 
43 #ifndef __KERNEL__
44 # include "reqh/reqh.h"
45 # include "reqh/reqh_service.h"
46 # include "motr/setup.h"
47 #endif
48 
49 #ifndef __KERNEL__
51 {
52  int rc;
53 
54  M0_ENTRY("server_ctx: %p", sctx);
55  M0_PRE(sctx->rsx_argv != NULL && sctx->rsx_argc > 0);
56 
57  /* Open error log file */
58  sctx->rsx_log_file = fopen(sctx->rsx_log_file_name, "w+");
59  if (sctx->rsx_log_file == NULL)
60  return M0_ERR_INFO(errno, "Open of error log file");
61 
64  M0_LOG(M0_DEBUG, "cs_init: rc=%d", rc);
65  if (rc != 0)
66  goto fclose;
67 
69  sctx->rsx_argv);
70  M0_LOG(M0_DEBUG, "cs_setup_env: rc=%d", rc);
71  if (rc != 0)
72  goto error;
73 
75  if (rc == 0)
76  return M0_RC(0);
77 
78 error:
80 fclose:
81  fclose(sctx->rsx_log_file);
82  return M0_RC(rc);
83 }
84 
86 {
87  M0_ENTRY("server_ctx: %p", sctx);
88 
90  fclose(sctx->rsx_log_file);
91 
92  M0_LEAVE();
93 }
94 
95 M0_INTERNAL struct m0_rpc_machine *
97 {
99 }
100 #endif /* !__KERNEL__ */
101 
102 M0_INTERNAL int m0_rpc_client_connect(struct m0_rpc_conn *conn,
103  struct m0_rpc_session *session,
104  struct m0_rpc_machine *rpc_mach,
105  const char *remote_addr,
106  struct m0_fid *svc_fid,
107  uint64_t max_rpcs_in_flight,
108  m0_time_t abs_timeout)
109 {
110  struct m0_net_end_point *ep;
111  int rc;
112 
113  M0_ENTRY("conn=%p session=%p rpc_mach=%p remote_addr=%s",
114  conn, session, rpc_mach, remote_addr);
115 
116  rc = m0_net_end_point_create(&ep, &rpc_mach->rm_tm, remote_addr);
117  if (rc != 0)
118  return M0_RC(rc);
119 
120  rc = m0_rpc_conn_create(conn, svc_fid, ep, rpc_mach, max_rpcs_in_flight,
121  abs_timeout);
123  if (rc != 0)
124  return M0_RC(rc);
125 
126  rc = m0_rpc_session_create(session, conn, abs_timeout);
127  if (rc != 0)
128  (void)m0_rpc_conn_destroy(conn, abs_timeout);
129 
130  return M0_RC(rc);
131 }
132 
133 M0_INTERNAL int
135  struct m0_rpc_session *session,
136  struct m0_rpc_machine *rpc_mach,
137  const char *remote_addr,
139  uint64_t max_rpcs_in_flight,
140  m0_time_t abs_timeout)
141 {
142  struct m0_conf_obj *svc_obj = NULL;
143  struct m0_fid *svc_fid = NULL;
144  int rc;
145 
146  M0_ENTRY();
147  M0_PRE(rpc_mach != NULL);
148 
150  remote_addr, &svc_obj);
151  if (rc != 0)
152  return M0_ERR(rc);
153  if (svc_obj != NULL)
154  svc_fid = &svc_obj->co_id;
156  svc_fid, max_rpcs_in_flight,
157  abs_timeout));
158 }
159 
161 {
162  enum { NR_TM = 1 }; /* number of TMs */
163  int rc;
164 
165  M0_ENTRY("client_ctx: %p", cctx);
166 
169 
173  NR_TM);
174  if (rc != 0)
175  return M0_RC(rc);
176 
177  M0_SET0(&cctx->rcx_reqh);
179  .rhia_dtm = (void*)1,
180  .rhia_mdstore = (void*)1,
181  .rhia_fid = cctx->rcx_fid,
182  );
183  if (rc != 0)
184  goto err;
186 
189  &cctx->rcx_reqh,
193  if (rc != 0) {
196  goto err;
197  }
198 
204  if (rc == 0)
205  return M0_RC(0);
206 
210 err:
212  return M0_RC(rc);
213 }
214 M0_EXPORTED(m0_rpc_client_start);
215 
216 
218 {
220 }
221 M0_EXPORTED(m0_rpc_client_stop);
222 
224  void (*printout)(struct m0_rpc_machine *))
225 {
226  int rc0;
227  int rc1;
228 
229  M0_ENTRY("client_ctx: %p", cctx);
230 
232  if (rc0 != 0)
233  M0_LOG(M0_ERROR, "Failed to terminate session %d", rc0);
234 
236  if (rc1 != 0)
237  M0_LOG(M0_ERROR, "Failed to terminate connection %d", rc1);
238  if (printout != NULL)
239  printout(&cctx->rcx_rpc_machine);
244 
245  return M0_RC(rc0 ?: rc1);
246 }
247 M0_EXPORTED(m0_rpc_client_stop_stats);
248 
250  struct m0_rpc_session *session,
251  const struct m0_rpc_item_ops *ri_ops,
252  m0_time_t deadline,
254 {
255  struct m0_rpc_item *item;
256  int rc;
257 
258  M0_ENTRY("fop: %p, session: %p", fop, session);
259  M0_PRE(session != NULL);
260 
261  item = &fop->f_item;
262  item->ri_ops = ri_ops;
265  item->ri_deadline = deadline;
266 
267  /* Set default value if user hasn't set anything else */
268  if (item->ri_nr_sent_max == ~(uint64_t)0)
270 
271  /*
272  * Add a ref so that the item does not get vanished, say due to
273  * session cancellation, while it is being waited for.
274  */
275  m0_fop_get(fop);
276  rc = m0_rpc_post(item) ?:
280  return M0_RC(rc);
281 }
282 M0_EXPORTED(m0_rpc_post_with_timeout_sync);
283 
285  struct m0_rpc_session *session,
286  const struct m0_rpc_item_ops *ri_ops,
287  m0_time_t deadline)
288 {
289  M0_ENTRY("fop: %p, session: %p", fop, session);
290  M0_PRE(session != NULL);
291 
292  return m0_rpc_post_with_timeout_sync(fop, session, ri_ops, deadline,
293  M0_TIME_NEVER);
294 }
295 M0_EXPORTED(m0_rpc_post_sync);
296 
297 #undef M0_TRACE_SUBSYSTEM
298 
299 /*
300  * Local variables:
301  * c-indentation-style: "K&R"
302  * c-basic-offset: 8
303  * tab-width: 8
304  * fill-column: 80
305  * scroll-step: 1
306  * End:
307  */
struct m0_fid co_id
Definition: obj.h:208
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
#define M0_PRE(cond)
void m0_rpc_machine_fini(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:233
M0_INTERNAL void m0_reqh_services_terminate(struct m0_reqh *reqh)
Definition: reqh.c:675
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
#define NULL
Definition: misc.h:38
m0_conf_service_type
Definition: schema.h:194
int m0_rpc_post_with_timeout_sync(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ri_ops, m0_time_t deadline, m0_time_t timeout)
Definition: rpclib.c:249
#define M0_REQH_INIT(reqh,...)
Definition: reqh.h:262
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
M0_INTERNAL int m0_rpc_session_create(struct m0_rpc_session *session, struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: session.c:352
uint64_t m0_time_t
Definition: time.h:37
char ** rsx_argv
Definition: rpclib.h:77
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
int m0_rpc_server_start(struct m0_rpc_server_ctx *sctx)
Definition: rpclib.c:50
int m0_cs_setup_env(struct m0_motr *cctx, int argc, char **argv)
Definition: setup.c:2972
struct m0_net_buffer_pool rcx_buffer_pool
Definition: rpclib.h:150
M0_INTERNAL int m0_rpc_client_find_connect(struct m0_rpc_conn *conn, struct m0_rpc_session *session, struct m0_rpc_machine *rpc_mach, const char *remote_addr, enum m0_conf_service_type stype, uint64_t max_rpcs_in_flight, m0_time_t abs_timeout)
Definition: rpclib.c:134
void m0_cs_fini(struct m0_motr *cctx)
Definition: setup.c:3029
int m0_rpc_session_destroy(struct m0_rpc_session *session, m0_time_t abs_timeout)
Definition: session.c:559
static struct m0_rpc_client_ctx cctx
Definition: rconfc.c:69
static int error
Definition: mdstore.c:64
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
static const char * remote_addr
Definition: rcv_session.c:38
M0_INTERNAL void m0_reqh_fini(struct m0_reqh *reqh)
Definition: reqh.c:320
M0_INTERNAL struct m0_rpc_machine * m0_motr_to_rmach(struct m0_motr *motr)
Definition: setup.c:196
static struct m0_rpc_item * item
Definition: item.c:56
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
Definition: item.c:824
return M0_RC(rc)
Definition: sock.c:754
FILE * rsx_log_file
Definition: rpclib.h:90
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL int m0_confc_service_find(struct m0_confc *confc, enum m0_conf_service_type stype, const char *ep, struct m0_conf_obj **result)
Definition: helpers.c:304
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
static struct m0_rm_incoming_ops ri_ops
Definition: wlock_helper.c:71
M0_INTERNAL int m0_rpc_conn_create(struct m0_rpc_conn *conn, struct m0_fid *svc_fid, struct m0_net_end_point *ep, struct m0_rpc_machine *rpc_machine, uint64_t max_rpcs_in_flight, m0_time_t abs_timeout)
Definition: conn.c:809
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL int m0_rpc_client_connect(struct m0_rpc_conn *conn, struct m0_rpc_session *session, struct m0_rpc_machine *rpc_mach, const char *remote_addr, struct m0_fid *svc_fid, uint64_t max_rpcs_in_flight, m0_time_t abs_timeout)
Definition: rpclib.c:102
M0_INTERNAL struct m0_confc * m0_reqh2confc(struct m0_reqh *reqh)
Definition: reqh.c:753
static const struct socktype stype[]
Definition: sock.c:1156
const char * rsx_log_file_name
Definition: rpclib.h:81
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
uint32_t rcx_max_rpc_msg_size
Definition: rpclib.h:156
M0_INTERNAL int m0_rpc_net_buffer_pool_setup(struct m0_net_domain *ndom, struct m0_net_buffer_pool *app_pool, uint32_t bufs_nr, uint32_t tm_nr)
Definition: rpc.c:229
struct m0_rpc_conn rcx_connection
Definition: rpclib.h:146
M0_INTERNAL uint32_t m0_rpc_bufs_nr(uint32_t len, uint32_t tms_nr)
Definition: rpc.c:271
int m0_rpc_conn_destroy(struct m0_rpc_conn *conn, m0_time_t abs_timeout)
Definition: conn.c:974
int m0_rpc_client_stop(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:217
M0_INTERNAL int m0_rpc_machine_init(struct m0_rpc_machine *machine, struct m0_net_domain *net_dom, const char *ep_addr, struct m0_reqh *reqh, struct m0_net_buffer_pool *receive_pool, uint32_t colour, m0_bcount_t msg_size, uint32_t queue_len)
Definition: rpc_machine.c:123
struct m0_fop * m0_fop_get(struct m0_fop *fop)
Definition: fop.c:162
int m0_rpc_client_start(struct m0_rpc_client_ctx *cctx)
Definition: rpclib.c:160
struct m0_net_xprt ** rsx_xprts
Definition: rpclib.h:69
struct m0_rpc_item * ri_reply
Definition: item.h:163
static struct m0_rpc_server_ctx sctx
Definition: console.c:88
uint32_t rcx_recv_queue_min_length
Definition: rpclib.h:153
int m0_rpc_post_sync(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ri_ops, m0_time_t deadline)
Definition: rpclib.c:284
uint64_t ri_nr_sent_max
Definition: item.h:146
struct m0_rpc_conn conn
Definition: fsync.c:96
struct m0_fid * rcx_fid
Definition: rpclib.h:161
m0_time_t rcx_abs_timeout
Definition: rpclib.h:159
struct m0_net_domain * rcx_net_dom
Definition: rpclib.h:128
static uint32_t timeout
Definition: console.c:52
uint64_t rcx_max_rpcs_in_flight
Definition: rpclib.h:136
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
M0_INTERNAL void m0_reqh_start(struct m0_reqh *reqh)
Definition: reqh.c:711
int m0_cs_init(struct m0_motr *cctx, struct m0_net_xprt **xprts, size_t xprts_nr, FILE *out, bool mkfs)
Definition: setup.c:2999
char * ep
Definition: sw.h:132
int rsx_xprts_nr
Definition: rpclib.h:71
struct m0_rpc_session rcx_session
Definition: rpclib.h:147
Definition: fid.h:38
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
struct m0_rpc_session * ri_session
Definition: item.h:147
const char * rcx_remote_addr
Definition: rpclib.h:134
int m0_rpc_client_stop_stats(struct m0_rpc_client_ctx *cctx, void(*printout)(struct m0_rpc_machine *))
Definition: rpclib.c:223
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
const char * rcx_local_addr
Definition: rpclib.h:131
int m0_cs_start(struct m0_motr *cctx)
Definition: setup.c:2987
void m0_rpc_server_stop(struct m0_rpc_server_ctx *sctx)
Definition: rpclib.c:85
struct m0_rpc_machine rcx_rpc_machine
Definition: rpclib.h:145
void m0_rpc_net_buffer_pool_cleanup(struct m0_net_buffer_pool *app_pool)
Definition: rpc.c:264
struct m0_reqh rcx_reqh
Definition: rpclib.h:144
struct m0_rpc_item f_item
Definition: fop.h:83
M0_INTERNAL struct m0_rpc_machine * m0_rpc_server_ctx_get_rmachine(struct m0_rpc_server_ctx *sctx)
Definition: rpclib.c:96
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: ep.c:56
struct m0_motr rsx_motr_ctx
Definition: rpclib.h:84
struct m0_reqh * rm_reqh
Definition: rpc_machine.h:105
Definition: fop.h:79
m0_time_t ri_deadline
Definition: item.h:141