Motr  M0
service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
24 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
25 #include "lib/errno.h"
26 #include "lib/trace.h"
27 #include "lib/tlist.h"
28 #include "lib/time.h" /* m0_time_from_now */
29 #include "lib/bob.h"
30 #include "lib/memory.h" /* m0_alloc() */
31 #include "rpc/link.h"
32 #include "rpc/service.h"
33 #include "rpc/rev_conn.h"
34 #include "rpc/rpc_internal.h"
35 #include "reqh/reqh.h"
36 #include "reqh/reqh_service.h"
37 #include "ha/note_fops.h" /* m0_ha_state_fop_init */
38 
45 M0_TL_DESCR_DEFINE(rev_conn, "Reverse Connections", static,
46  struct m0_reverse_connection, rcf_linkage, rcf_magic,
48 M0_TL_DEFINE(rev_conn, static, struct m0_reverse_connection);
49 
50 static const struct m0_bob_type rpc_svc_bob = {
51  .bt_name = "rpc service",
52  .bt_magix_offset = offsetof(struct m0_rpc_service, rps_magix),
53  .bt_magix = M0_RPC_SERVICE_MAGIC,
54  .bt_check = NULL
55 };
57 
59 {
60  struct m0_rpc_service *svc;
61 
63  rev_conn_tlist_init(&svc->rps_rev_conns);
64 
65  return 0;
66 }
67 
69 {
70  struct m0_rpc_service *svc;
71 
74  rev_conn_tlist_fini(&svc->rps_rev_conns);
75 }
76 
78 {
79  struct m0_rpc_service *svc;
80 
82  m0_rpc_service_bob_fini(svc);
83  m0_free(svc);
84 }
85 
86 static int
88 {
89  return 0;
90 }
91 
92 static const struct m0_reqh_service_ops rpc_ops = {
94  .rso_stop = rpc_service_stop,
95  .rso_fini = rpc_service_fini,
96  .rso_fop_accept = rpc_service_fop_accept
97 };
98 
100  const struct m0_reqh_service_type *stype)
101 {
102  struct m0_rpc_service *svc;
103 
104  M0_PRE(stype != NULL && service != NULL);
105 
106  M0_ALLOC_PTR(svc);
107  if (svc == NULL)
108  return M0_ERR(-ENOMEM);
109 
110  m0_rpc_service_bob_init(svc);
111  svc->rps_svc.rs_ops = &rpc_ops;
112  *service = &svc->rps_svc;
113  return 0;
114 }
115 
118 };
119 
121  .rst_name = "rpcservice",
122  .rst_ops = &rpc_service_type_ops,
123  .rst_level = M0_RPC_SVC_LEVEL,
124  .rst_keep_alive = true,
125 };
126 M0_EXPORTED(m0_rpc_service_type);
127 
128 M0_INTERNAL int m0_rpc_service_register(void)
129 {
132 }
133 
134 M0_INTERNAL void m0_rpc_service_unregister(void)
135 {
138 }
139 
140 M0_INTERNAL struct m0_rpc_session *
142  const struct m0_rpc_item *item)
143 {
144 
145  struct m0_reverse_connection *revc;
146  struct m0_rpc_service *svc;
147  const struct m0_rpc_item_header2 *header;
148 
150  header = &item->ri_header;
151  svc = bob_of(service, struct m0_rpc_service, rps_svc, &rpc_svc_bob);
152  if (header->osr_sender_id == SENDER_ID_INVALID)
153  revc = m0_tl_find(rev_conn, revc, &svc->rps_rev_conns,
155  &header->osr_uuid) == 0);
156  else
157  revc = m0_tl_find(rev_conn, revc, &svc->rps_rev_conns,
159  header->osr_sender_id);
160  return revc == NULL ? NULL : &revc->rcf_rlink.rlk_sess;
161 }
162 
163 M0_INTERNAL int
165  const struct m0_rpc_item *item,
166  struct m0_clink *clink,
167  struct m0_rpc_session **session)
168 {
169  int rc;
170  const char *rem_ep;
171  struct m0_reverse_connection *revc;
172  struct m0_rpc_service *svc;
173 
174  M0_ENTRY();
176 
179 
180  M0_ALLOC_PTR(revc);
181  rc = revc == NULL ? -ENOMEM : 0;
182  rc = rc ?: m0_rpc_link_init(&revc->rcf_rlink,
183  item->ri_rmachine, NULL, rem_ep,
185  if (rc == 0) {
188  clink);
189  *session = &revc->rcf_rlink.rlk_sess;
190  rev_conn_tlink_init_at_tail(revc, &svc->rps_rev_conns);
191  }
192  return M0_RC(rc);
193 }
194 
195 static void rev_conn_free(struct m0_sm_group *grp, struct m0_sm_ast *ast)
196 {
197  struct m0_reverse_connection *revc = ast->sa_datum;
198 
199  M0_ENTRY("revc=%p", revc);
200  m0_rpc_link_fini(&revc->rcf_rlink);
201  rev_conn_tlink_del_fini(revc);
202  m0_free(revc);
203  M0_LEAVE("revc=%p", revc);
204 }
205 
206 static bool rev_conn_disconnected_cb(struct m0_clink *link)
207 {
208  struct m0_reverse_connection *revc;
209 
210  revc = container_of(link, struct m0_reverse_connection, rcf_disc_wait);
211  revc->rcf_free_ast = (struct m0_sm_ast){
212  .sa_cb = &rev_conn_free,
213  .sa_datum = revc,
214  };
215  m0_sm_ast_post(m0_locality0_get()->lo_grp, &revc->rcf_free_ast);
216  return true;
217 }
218 
219 M0_INTERNAL void
221 {
222  struct m0_rpc_link *rlk;
223  struct m0_reverse_connection *revc;
224 
225  M0_ENTRY();
226 
227  rlk = container_of(sess, struct m0_rpc_link, rlk_sess);
228  revc = container_of(rlk, struct m0_reverse_connection, rcf_rlink);
229 
230  M0_SET0(&revc->rcf_disc_wait);
231  if (revc->rcf_rlink.rlk_connected) {
233  revc->rcf_disc_wait.cl_is_oneshot = true;
236  &revc->rcf_disc_wait);
237  }
238 
239  M0_LEAVE();
240 }
241 
242 M0_INTERNAL void
244 {
245  struct m0_rpc_service *svc;
246  struct m0_reverse_connection *revc;
247 
248  M0_ENTRY();
250 
251  /*
252  * We take the lock here on locality0 to avoid races with
253  * rev_conn_free() AST which runs in locality0.
254  */
256  svc = bob_of(service, struct m0_rpc_service, rps_svc, &rpc_svc_bob);
257  m0_tl_for(rev_conn, &svc->rps_rev_conns, revc) {
258  M0_SET0(&revc->rcf_disc_wait);
259  if (revc->rcf_rlink.rlk_connected) {
261  revc->rcf_disc_wait.cl_is_oneshot = true;
264  &revc->rcf_disc_wait);
265  }
266  } m0_tlist_endfor;
267  m0_tl_teardown(rev_conn, &svc->rps_rev_conns, revc) {
268  if (revc->rcf_disc_wait.cl_group != NULL) {
269  m0_chan_wait(&revc->rcf_disc_wait);
271  }
272  m0_rpc_link_fini(&revc->rcf_rlink);
273  rev_conn_tlink_fini(revc);
274  m0_free(revc);
275  }
277 
278  M0_LEAVE();
279 }
280 
282 {
283  return container_of(session, struct m0_rpc_link, rlk_sess)->rlk_rc;
284 }
285 
286 M0_INTERNAL struct m0_reqh_service *
288 {
290 }
291 
292 M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
293 {
294  return reqh->rh_rpc_service == NULL ?
297  0;
298 }
299 
300 M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
301 {
302  if (m0_reqh_rpc_mach_tlist_is_empty(&reqh->rh_rpc_machines)) {
305  }
306 }
307 
308 #undef M0_TRACE_SUBSYSTEM
309 
311 /*
312  * Local variables:
313  * c-indentation-style: "K&R"
314  * c-basic-offset: 8
315  * tab-width: 8
316  * fill-column: 80
317  * scroll-step: 1
318  * End:
319  */
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
static void rev_conn_free(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: service.c:195
int(* rso_start)(struct m0_reqh_service *service)
Definition: reqh_service.h:360
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_ha_state_fop_fini(void)
Definition: note_fops.c:37
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
static int rpc_service_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: service.c:99
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_LEAVE()
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL int m0_ha_state_fop_init(void)
Definition: note_fops.c:44
static void rpc_service_fini(struct m0_reqh_service *service)
Definition: service.c:77
M0_TL_DESCR_DEFINE(dopr, "dtm0_process", static, struct dtm0_process, dop_link, dop_magic, M0_DTM0_PROC_MAGIC, M0_DTM0_PROC_HEAD_MAGIC)
static const struct m0_reqh_service_type_ops rpc_service_type_ops
Definition: service.c:116
Definition: sm.h:504
#define container_of(ptr, type, member)
Definition: misc.h:33
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_reqh_service * rh_rpc_service
Definition: reqh.h:140
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
const char * bt_name
Definition: bob.h:73
M0_INTERNAL void m0_rpc_service_reverse_sessions_cleanup(struct m0_reqh_service *service)
Definition: service.c:243
static void rpc_service_stop(struct m0_reqh_service *service)
Definition: service.c:68
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
static const struct m0_reqh_service_ops rpc_ops
Definition: service.c:92
M0_INTERNAL void m0_rpc_service_reverse_session_put(struct m0_rpc_session *sess)
Definition: service.c:220
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
uint64_t c_sender_id
Definition: conn.h:269
M0_INTERNAL struct m0_reqh_service * m0_reqh_rpc_service_find(struct m0_reqh *reqh)
Definition: service.c:287
static const struct socktype stype[]
Definition: sock.c:1156
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
M0_INTERNAL void m0_reqh_service_quit(struct m0_reqh_service *svc)
Definition: reqh_service.c:588
M0_INTERNAL int m0_rpc_session_status(struct m0_rpc_session *session)
Definition: service.c:281
struct m0_rpc_item_header2 ri_header
Definition: item.h:193
const char * rst_name
Definition: reqh_service.h:447
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
static bool rev_conn_disconnected_cb(struct m0_clink *link)
Definition: service.c:206
M0_INTERNAL void m0_rpc_service_stop(struct m0_reqh *reqh)
Definition: service.c:300
M0_INTERNAL struct m0_rpc_session * m0_rpc_service_reverse_session_lookup(struct m0_reqh_service *service, const struct m0_rpc_item *item)
Definition: service.c:141
Definition: reqh.h:94
static const struct m0_bob_type rpc_svc_bob
Definition: service.c:50
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_reqh_service_setup(struct m0_reqh_service **out, struct m0_reqh_service_type *stype, struct m0_reqh *reqh, struct m0_reqh_context *rctx, const struct m0_fid *fid)
Definition: reqh_service.c:565
struct m0_reqh_service rps_svc
Definition: service.h:39
M0_TL_DEFINE(dopr, static, struct dtm0_process)
struct m0_uint128 c_uuid
Definition: conn.h:272
struct m0_rpc_link rcf_rlink
Definition: rev_conn.h:45
M0_BOB_DEFINE(static, &dtm0_service_bob, m0_dtm0_service)
M0_INTERNAL void m0_rpc_service_unregister(void)
Definition: service.c:134
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
struct m0_reqh reqh
Definition: rm_foms.c:48
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:435
struct m0_reqh_service_type m0_rpc_service_type
Definition: service.c:120
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:227
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL int m0_rpc_service_reverse_session_get(struct m0_reqh_service *service, const struct m0_rpc_item *item, struct m0_clink *clink, struct m0_rpc_session **session)
Definition: service.c:164
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static struct m0_fop * fop
Definition: item.c:57
#define m0_tlist_endfor
Definition: tlist.h:448
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
static int rpc_service_fop_accept(struct m0_reqh_service *service, struct m0_fop *fop)
Definition: service.c:87
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
struct m0_clink rcf_disc_wait
Definition: rev_conn.h:48
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
struct m0_sm_ast rcf_free_ast
Definition: rev_conn.h:49
void m0_free(void *data)
Definition: memory.c:146
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
M0_INTERNAL int m0_rpc_service_start(struct m0_reqh *reqh)
Definition: service.c:292
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_rpc_service_register(void)
Definition: service.c:128
#define offsetof(typ, memb)
Definition: misc.h:29
static int rpc_service_start(struct m0_reqh_service *service)
Definition: service.c:58
Definition: fop.h:79
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
Definition: item.c:1188