23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC 40 cpi_linkage, cpi_magic,
48 const char *remote_ep)
108 const char *remote_ep)
115 NULL, remote_ep,
pool->cp_max_rpcs_in_flight);
121 item->cpi_clink.cl_is_oneshot =
true;
123 rpc_conn_pool_items_tlink_init_at_tail(
item, &
pool->cp_items);
137 if (rpc_conn_pool_items_tlink_is_in(
item))
138 rpc_conn_pool_items_tlink_del_fini(
item);
144 const char *remote_ep)
171 "Could not allocate new connection pool item");
172 goto conn_pool_item_get_leave;
191 conn_pool_item_get_leave:
199 const char *remote_ep,
222 "conn error %s", remote_ep);
232 const char *remote_ep,
251 if (
item->cpi_connecting) {
259 rpc_link = &
item->cpi_rpc_link;
266 item->cpi_connecting =
true;
329 uint64_t max_rpcs_in_flight)
336 pool->cp_rpc_mach = rpc_mach;
337 pool->cp_timeout = conn_timeout;
338 pool->cp_max_rpcs_in_flight = max_rpcs_in_flight;
341 rpc_conn_pool_items_tlist_init(&
pool->cp_items);
356 &
item->cpi_rpc_link.rlk_sess)) {
366 rpc_conn_pool_items_tlist_fini(&
pool->cp_items);
403 !!
item->cpi_rpc_link.rlk_connected);
431 item->cpi_rpc_link.rlk_connected){
434 item->cpi_clink.cl_is_oneshot =
true;
466 rpc_conn_pool_items_tlink_del_fini(
item);
483 #undef M0_TRACE_SUBSYSTEM
M0_INTERNAL void m0_rpc_conn_pool_destroy(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
M0_INTERNAL void m0_rpc_conn_pool_fini(struct m0_rpc_conn_pool *pool)
struct m0_rpc_link cpi_rpc_link
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
static void pool_item_disconnected_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
const m0_time_t M0_TIME_NEVER
M0_INTERNAL int m0_rpc_link_init(struct m0_rpc_link *rlink, struct m0_rpc_machine *mach, struct m0_fid *svc_fid, const char *ep, uint64_t max_rpcs_in_flight)
static struct m0_sm_group * grp
#define M0_LOG(level,...)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL int m0_rpc_conn_pool_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_machine *rpc_mach, m0_time_t conn_timeout, uint64_t max_rpcs_in_flight)
struct m0_rpc_conn rlk_conn
M0_INTERNAL void m0_rpc_link_fini(struct m0_rpc_link *rlink)
#define container_of(ptr, type, member)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
static bool pool_item_disconn_cb(struct m0_clink *link)
static void conn_pool_item_fini(struct m0_rpc_conn_pool_item *item)
static struct m0_rpc_item * item
static void pool_item_disconn_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
static struct m0_sm_ast ast[NR]
M0_TL_DEFINE(rpc_conn_pool_items, M0_INTERNAL, struct m0_rpc_conn_pool_item)
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
m0_time_t m0_time_now(void)
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
struct m0_rpc_conn_pool * cpi_pool
static struct m0_rpc_conn_pool_item * find_pool_item(struct m0_rpc_session *session)
M0_INTERNAL struct m0_chan * m0_rpc_conn_pool_session_chan(struct m0_rpc_session *session)
M0_TL_DESCR_DEFINE(rpc_conn_pool_items, "rpc cpi list", M0_INTERNAL, struct m0_rpc_conn_pool_item, cpi_linkage, cpi_magic, M0_RPC_CONN_POOL_ITEMS_MAGIC, M0_RPC_CONN_POOL_ITEMS_HEAD_MAGIC)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
struct m0_sm_group * lo_grp
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL bool m0_rpc_conn_pool_session_established(struct m0_rpc_session *session)
M0_INTERNAL int m0_rpc_link_disconnect_sync(struct m0_rpc_link *rlink, m0_time_t abs_timeout)
static struct m0_rpc_conn_pool_item * find_item_by_ep(struct m0_rpc_conn_pool *pool, const char *remote_ep)
static struct m0_pool pool
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL void m0_rpc_link_connect_async(struct m0_rpc_link *rlink, m0_time_t abs_timeout, struct m0_clink *wait_clink)
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
static struct m0_rpc_conn_pool_item * conn_pool_item_get(struct m0_rpc_conn_pool *pool, const char *remote_ep)
#define M0_FI_ENABLED(tag)
M0_INTERNAL int m0_rpc_conn_pool_get_async(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
#define M0_ALLOC_PTR(ptr)
m0_time_t m0_time_from_now(uint64_t secs, long ns)
static bool pool_item_clink_cb(struct m0_clink *link)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
struct m0_rpc_session rlk_sess
M0_INTERNAL int m0_rpc_conn_pool_get_sync(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
M0_INTERNAL void m0_rpc_link_disconnect_async(struct m0_rpc_link *rlink, m0_time_t abs_timeout, struct m0_clink *wait_clink)
#define m0_tl_for(name, head, obj)
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
static int conn_pool_item_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_conn_pool_item *item, const char *remote_ep)
M0_INTERNAL void m0_rpc_conn_pool_put(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)