Motr  M0
conn_pool.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2017-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 #include "lib/string.h"
28 #include "lib/errno.h"
29 #include "lib/finject.h" /* M0_FI_ENABLED */
30 #include "rpc/session.h"
31 #include "rpc/link.h"
32 #include "rpc/conn_pool.h"
33 #include "rpc/conn_pool_internal.h"
34 #include "rpc/rpc_machine_internal.h" /* m0_rpc_chan */
35 
36 M0_TL_DESCR_DEFINE(rpc_conn_pool_items,
37  "rpc cpi list",
38  M0_INTERNAL,
39  struct m0_rpc_conn_pool_item,
40  cpi_linkage, cpi_magic,
43 
44 M0_TL_DEFINE(rpc_conn_pool_items, M0_INTERNAL, struct m0_rpc_conn_pool_item);
45 
47  struct m0_rpc_conn_pool *pool,
48  const char *remote_ep)
49 {
50  struct m0_rpc_conn_pool_item *ret = NULL;
51  struct m0_rpc_conn_pool_item *pool_item;
52 
53  M0_ENTRY();
54  M0_PRE(m0_mutex_is_locked(&pool->cp_mutex));
55  m0_tl_for(rpc_conn_pool_items, &pool->cp_items, pool_item) {
56  if (!strcmp(m0_rpc_conn_addr(&pool_item->cpi_rpc_link.rlk_conn),
57  remote_ep))
58  {
59  ret = pool_item;
60  break;
61  }
62  } m0_tl_endfor;
63 
64  M0_LEAVE("ret %p", ret);
65 
66  return ret;
67 }
68 
70  struct m0_rpc_session *session)
71 {
72  struct m0_rpc_conn_pool_item *ret;
73  struct m0_rpc_link *rpc_link;
74 
75  M0_ENTRY();
76 
77  rpc_link = container_of(session, struct m0_rpc_link, rlk_sess);
78  ret = container_of(rpc_link,
79  struct m0_rpc_conn_pool_item, cpi_rpc_link);
80 
81  M0_LEAVE("ret %p", ret);
82 
83  return ret;
84 }
85 
86 static bool pool_item_clink_cb(struct m0_clink *link)
87 {
88  struct m0_rpc_conn_pool_item *pool_item;
89  struct m0_rpc_conn_pool *pool;
90 
91  M0_ENTRY("link %p", link);
92 
93  pool_item = container_of(link,
94  struct m0_rpc_conn_pool_item, cpi_clink);
95  m0_chan_broadcast_lock(&pool_item->cpi_chan);
96  pool = pool_item->cpi_pool;
97  m0_mutex_lock(&pool->cp_mutex);
98  pool_item->cpi_connecting = false;
99  m0_mutex_unlock(&pool->cp_mutex);
100 
101  M0_LEAVE();
102  return true;
103 }
104 
106  struct m0_rpc_conn_pool *pool,
107  struct m0_rpc_conn_pool_item *item,
108  const char *remote_ep)
109 {
110  int rc;
111 
112  M0_PRE(m0_mutex_is_locked(&pool->cp_mutex));
113 
114  rc = m0_rpc_link_init(&item->cpi_rpc_link, pool->cp_rpc_mach,
115  NULL, remote_ep, pool->cp_max_rpcs_in_flight);
116 
117  if (rc == 0) {
118  m0_chan_init(&item->cpi_chan, &pool->cp_ch_mutex);
119 
120  m0_clink_init(&item->cpi_clink, pool_item_clink_cb);
121  item->cpi_clink.cl_is_oneshot = true;
122 
123  rpc_conn_pool_items_tlink_init_at_tail(item, &pool->cp_items);
124  item->cpi_pool = pool;
125  }
126 
127  return rc;
128 }
129 
131 {
132  M0_PRE(item->cpi_pool != NULL);
133  M0_PRE(m0_mutex_is_locked(&item->cpi_pool->cp_mutex));
134  m0_clink_fini(&item->cpi_clink);
135  m0_chan_fini_lock(&item->cpi_chan);
136  m0_rpc_link_fini(&item->cpi_rpc_link);
137  if (rpc_conn_pool_items_tlink_is_in(item))
138  rpc_conn_pool_items_tlink_del_fini(item);
139  m0_free(item);
140 }
141 
143  struct m0_rpc_conn_pool *pool,
144  const char *remote_ep)
145 {
146  struct m0_rpc_conn_pool_item *item;
147  int rc;
148 
149  M0_PRE(m0_mutex_is_locked(&pool->cp_mutex));
150 
151  if (M0_FI_ENABLED("fail_conn_get"))
152  return NULL;
153 
154  /*
155  * @todo Implement invariant function for pool and call
156  * here. (phase 2)
157  */
158  M0_PRE(pool != NULL && pool->cp_rpc_mach != NULL);
159  M0_ASSERT(remote_ep != NULL);
160 
161  M0_ENTRY("pool %p, remote_ep %s", pool, remote_ep);
162 
163  item = find_item_by_ep(pool, remote_ep);
164 
165  if (item == NULL) {
166  /* Add next item */
168 
169  if (item == NULL) {
171  "Could not allocate new connection pool item");
172  goto conn_pool_item_get_leave;
173  }
174 
175  rc = conn_pool_item_init(pool, item, remote_ep);
176 
177  if (rc != 0) {
178  m0_free(item);
179  item = NULL;
180  }
181  }
182 
183  if (item != NULL) {
184  /*
185  * @todo Is it necessary to do something
186  * when item->cpi_users_nr becomes zero?
187  */
188  M0_CNT_INC(item->cpi_users_nr);
189  }
190 
191 conn_pool_item_get_leave:
192  M0_LEAVE("item %p", item);
193 
194  return item;
195 }
196 
198  struct m0_rpc_conn_pool *pool,
199  const char *remote_ep,
200  struct m0_rpc_session **session)
201 {
202  struct m0_clink clink;
203  int rc;
204 
206  pool,
207  remote_ep,
208  session);
209 
210  if (rc == -EBUSY) {
212  clink.cl_is_oneshot = true;
214  &clink);
217  rc = find_pool_item(*session)->cpi_rpc_link.rlk_rc;
218  }
219 
220  if (rc != 0 && *session) {
222  "conn error %s", remote_ep);
224  pool, *session);
225  }
226 
227  return M0_RC(rc);
228 }
229 
231  struct m0_rpc_conn_pool *pool,
232  const char *remote_ep,
233  struct m0_rpc_session **session)
234 {
235  struct m0_rpc_conn_pool_item *item;
236  int rc = 0;
237  struct m0_rpc_link *rpc_link;
238 
239  if (M0_FI_ENABLED("fail_conn_get")) {
240  *session = NULL;
241  return -ENOMEM;
242  }
243 
244  m0_mutex_lock(&pool->cp_mutex);
245  M0_LOG(M0_DEBUG, "con pool mutex locked");
246 
247  item = conn_pool_item_get(pool, remote_ep);
248  *session = (item != NULL) ? &item->cpi_rpc_link.rlk_sess : NULL;
249 
250  if (item != NULL) {
251  if (item->cpi_connecting) {
252  rc = -EBUSY;
259  rpc_link = &item->cpi_rpc_link;
260  if (rpc_link->rlk_connected) {
262  pool->cp_timeout == M0_TIME_NEVER ?
263  M0_TIME_NEVER :
264  m0_time_now() + pool->cp_timeout);
265  }
266  item->cpi_connecting = true;
267  rpc_link->rlk_rc = 0;
268  M0_LOG(M0_DEBUG, "ASYNC CONN to %s", remote_ep);
269 
271  rpc_link,
272  pool->cp_timeout == M0_TIME_NEVER ?
273  M0_TIME_NEVER :
274  m0_time_now() + pool->cp_timeout,
275  &item->cpi_clink);
276  rc = -EBUSY;
277  }
278  } else {
279  rc = -ENOMEM;
280  }
281 
282  m0_mutex_unlock(&pool->cp_mutex);
283  M0_LOG(M0_DEBUG, "con pool mutex unlocked");
284 
285  return M0_RC(rc);
286 }
287 
288 M0_INTERNAL void m0_rpc_conn_pool_put(
289  struct m0_rpc_conn_pool *pool,
290  struct m0_rpc_session *session)
291 {
292  M0_LOG(M0_DEBUG, "m0_rpc_conn_pool_put");
293 
294  m0_mutex_lock(&pool->cp_mutex);
295  M0_LOG(M0_DEBUG, "con pool mutex locked");
296 
297  /* @todo Do we do anything when counter is zero? */
298  M0_CNT_DEC(find_pool_item(session)->cpi_users_nr);
299 
300  m0_mutex_unlock(&pool->cp_mutex);
301  M0_LOG(M0_DEBUG, "con pool mutex unlocked");
302 }
303 
305  struct m0_rpc_session *session)
306 {
307  return &find_pool_item(session)->cpi_chan;
308 }
309 
311  struct m0_rpc_session *session)
312 {
313  struct m0_rpc_link *rpc_link;
314  uint32_t sess_state;
315 
316  rpc_link = container_of(session, struct m0_rpc_link, rlk_sess);
317 
318  /* @todo Unprotected access to ->sm_state. */
319  sess_state = rpc_link->rlk_sess.s_sm.sm_state;
320  M0_LOG(M0_DEBUG, "session state %d", sess_state);
321 
322  return M0_IN(sess_state, (M0_RPC_SESSION_IDLE, M0_RPC_SESSION_BUSY));
323 }
324 
325 M0_INTERNAL int m0_rpc_conn_pool_init(
326  struct m0_rpc_conn_pool *pool,
327  struct m0_rpc_machine *rpc_mach,
328  m0_time_t conn_timeout,
329  uint64_t max_rpcs_in_flight)
330 {
331  M0_ENTRY("pool %p", pool);
332 
333  M0_ASSERT(rpc_mach != NULL);
334  M0_PRE(M0_IS0(pool));
335 
336  pool->cp_rpc_mach = rpc_mach;
337  pool->cp_timeout = conn_timeout;
338  pool->cp_max_rpcs_in_flight = max_rpcs_in_flight;
339  m0_mutex_init(&pool->cp_mutex);
340  m0_mutex_init(&pool->cp_ch_mutex);
341  rpc_conn_pool_items_tlist_init(&pool->cp_items);
342  return M0_RC(0);
343 }
344 
345 M0_INTERNAL void m0_rpc_conn_pool_fini(struct m0_rpc_conn_pool *pool)
346 {
347  struct m0_rpc_conn_pool_item *item;
348 
349  M0_ENTRY();
350 
351  m0_mutex_lock(&pool->cp_mutex);
352  M0_LOG(M0_DEBUG, "con pool mutex locked");
353 
354  m0_tl_for(rpc_conn_pool_items, &pool->cp_items, item) {
356  &item->cpi_rpc_link.rlk_sess)) {
358  &item->cpi_rpc_link,
359  pool->cp_timeout == M0_TIME_NEVER ?
360  M0_TIME_NEVER :
361  m0_time_now() + pool->cp_timeout);
362  }
363 
365  } m0_tl_endfor;
366  rpc_conn_pool_items_tlist_fini(&pool->cp_items);
367  m0_mutex_unlock(&pool->cp_mutex);
368  M0_LOG(M0_DEBUG, "con pool mutex unlocked");
369 
370  m0_mutex_fini(&pool->cp_mutex);
371  m0_mutex_fini(&pool->cp_ch_mutex);
372 
373  M0_LEAVE();
374 }
375 
377  struct m0_sm_ast *ast)
378 {
380 
381  M0_LOG(M0_DEBUG, "item=%p got freed", item);
382  m0_rpc_link_fini(&item->cpi_rpc_link);
383  m0_free(item);
384 }
385 
386 static bool pool_item_disconn_cb(struct m0_clink *link)
387 {
388  struct m0_rpc_conn_pool_item *item;
389  struct m0_rpc_conn_pool *pool;
390  struct m0_rpc_session *session;
391  struct m0_sm_ast *ast;
392  struct m0_locality *loc = m0_locality0_get();
393 
394  M0_ENTRY("link %p", link);
395 
396  item = container_of(link, struct m0_rpc_conn_pool_item, cpi_clink);
397  pool = item->cpi_pool;
398  session = &item->cpi_rpc_link.rlk_sess;
399 
400  m0_mutex_lock(&pool->cp_mutex);
401  M0_LOG(M0_DEBUG, "session established=%d rpc connected=%d",
403  !!item->cpi_rpc_link.rlk_connected);
404 
405  m0_clink_fini(&item->cpi_clink);
406  m0_chan_fini_lock(&item->cpi_chan);
407  M0_LOG(M0_DEBUG, "Going to free item=%p in an AST", item);
408 
409  ast = &pool->cp_ast;
411  ast->sa_datum = item;
412  m0_sm_ast_post(loc->lo_grp, ast);
413 
414  m0_mutex_unlock(&pool->cp_mutex);
415 
416  M0_LEAVE();
417  return true;
418 }
419 
421  struct m0_sm_ast *ast)
422 {
423  struct m0_rpc_conn_pool_item *item;
424  struct m0_rpc_session *session;
425 
426  item = ast->sa_datum;
427  session = &item->cpi_rpc_link.rlk_sess;
428  /* disconnect the rpc link */
429  M0_LOG(M0_DEBUG, "Async disconnect this rpc. item=%p", item);
431  item->cpi_rpc_link.rlk_connected){
434  item->cpi_clink.cl_is_oneshot = true;
435  m0_rpc_link_disconnect_async(&item->cpi_rpc_link,
436  m0_time_from_now(10, 0),
437  &item->cpi_clink);
438  }
439 }
440 
441 
451  struct m0_rpc_session *session)
452 {
453  struct m0_rpc_conn_pool_item *item;
454  struct m0_sm_ast *ast;
455  struct m0_locality *loc = m0_locality0_get();
456 
457  M0_ENTRY();
458 
459  m0_mutex_lock(&pool->cp_mutex);
460 
461  m0_tl_for(rpc_conn_pool_items, &pool->cp_items, item) {
462  if (&item->cpi_rpc_link.rlk_sess != session)
463  continue;
464 
465  /* remove this item from pool. */
466  rpc_conn_pool_items_tlink_del_fini(item);
467  break;
468  } m0_tl_endfor;
469 
470  if (item != NULL) {
471  /* Posting an AST to dosconnect this rpc */
472  ast = &pool->cp_ast;
474  ast->sa_datum = item;
475  m0_sm_ast_post(loc->lo_grp, ast);
476  }
477  m0_mutex_unlock(&pool->cp_mutex);
478 
479  M0_LEAVE();
480 }
481 
482 
483 #undef M0_TRACE_SUBSYSTEM
484 
485 /*
486  * Local variables:
487  * c-indentation-style: "K&R"
488  * c-basic-offset: 8
489  * tab-width: 8
490  * fill-column: 80
491  * scroll-step: 1
492  * End:
493  */
494 /*
495  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
496  */
M0_INTERNAL void m0_rpc_conn_pool_destroy(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
Definition: conn_pool.c:450
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
M0_INTERNAL void m0_rpc_conn_pool_fini(struct m0_rpc_conn_pool *pool)
Definition: conn_pool.c:345
struct m0_rpc_link cpi_rpc_link
Definition: conn_pool.h:36
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
static void pool_item_disconnected_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: conn_pool.c:376
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
static struct m0_sm_group * grp
Definition: bytecount.c:38
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL int m0_rpc_conn_pool_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_machine *rpc_mach, m0_time_t conn_timeout, uint64_t max_rpcs_in_flight)
Definition: conn_pool.c:325
Definition: sm.h:504
#define container_of(ptr, type, member)
Definition: misc.h:33
static struct m0_rpc_session session
Definition: formation2.c:38
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static bool pool_item_disconn_cb(struct m0_clink *link)
Definition: conn_pool.c:386
static void conn_pool_item_fini(struct m0_rpc_conn_pool_item *item)
Definition: conn_pool.c:130
static struct m0_rpc_item * item
Definition: item.c:56
static void pool_item_disconn_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: conn_pool.c:420
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
Definition: conn.c:1306
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_TL_DEFINE(rpc_conn_pool_items, M0_INTERNAL, struct m0_rpc_conn_pool_item)
void * sa_datum
Definition: sm.h:508
if(value==NULL)
Definition: dir.c:350
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
m0_time_t m0_time_now(void)
Definition: time.c:134
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
Definition: session.c:850
struct m0_rpc_conn_pool * cpi_pool
Definition: conn_pool.h:42
static struct m0_rpc_conn_pool_item * find_pool_item(struct m0_rpc_session *session)
Definition: conn_pool.c:69
M0_INTERNAL struct m0_chan * m0_rpc_conn_pool_session_chan(struct m0_rpc_session *session)
Definition: conn_pool.c:304
M0_TL_DESCR_DEFINE(rpc_conn_pool_items, "rpc cpi list", M0_INTERNAL, struct m0_rpc_conn_pool_item, cpi_linkage, cpi_magic, M0_RPC_CONN_POOL_ITEMS_MAGIC, M0_RPC_CONN_POOL_ITEMS_HEAD_MAGIC)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
struct m0_sm_group * lo_grp
Definition: locality.h:67
Definition: chan.h:229
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL bool m0_rpc_conn_pool_session_established(struct m0_rpc_session *session)
Definition: conn_pool.c:310
static struct m0_rpc_conn_pool_item * find_item_by_ep(struct m0_rpc_conn_pool *pool, const char *remote_ep)
Definition: conn_pool.c:46
struct m0_sm s_sm
Definition: session.h:325
static struct m0_pool pool
Definition: iter_ut.c:58
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
static struct m0_rpc_conn_pool_item * conn_pool_item_get(struct m0_rpc_conn_pool *pool, const char *remote_ep)
Definition: conn_pool.c:142
#define M0_CNT_INC(cnt)
Definition: arith.h:226
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
#define M0_IS0(obj)
Definition: misc.h:70
M0_INTERNAL int m0_rpc_conn_pool_get_async(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
Definition: conn_pool.c:230
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
static bool pool_item_clink_cb(struct m0_clink *link)
Definition: conn_pool.c:86
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
M0_INTERNAL int m0_rpc_conn_pool_get_sync(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
Definition: conn_pool.c:197
struct m0_chan cpi_chan
Definition: conn_pool.h:37
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
uint32_t sm_state
Definition: sm.h:307
int32_t rc
Definition: trigger_fop.h:47
static int conn_pool_item_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_conn_pool_item *item, const char *remote_ep)
Definition: conn_pool.c:105
M0_INTERNAL void m0_rpc_conn_pool_put(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
Definition: conn_pool.c:288