Motr  M0
mem_xprt_msg.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 /* This file is included into mem_xprt_xo.c */
24 
30 static void mem_wf_msg_recv_cb(struct m0_net_transfer_mc *tm,
31  struct m0_net_bulk_mem_work_item *wi)
32 {
33  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
34 
35  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
36  M0_PRE(nb != NULL &&
38  nb->nb_tm == tm &&
39  (wi->xwi_status < 0 || /* failed or we have a non-zero msg*/
40  (wi->xwi_nbe_ep != NULL && wi->xwi_nbe_length > 0)));
42 
43  /* post the recv completion callback (will clear M0_NET_BUF_IN_USE) */
45  return;
46 }
47 
67 static int mem_find_remote_tm(struct m0_net_transfer_mc *tm,
68  struct m0_net_end_point *match_ep,
69  struct m0_net_transfer_mc **p_dest_tm,
70  struct m0_net_end_point **p_dest_ep)
71 {
72  struct m0_net_domain *mydom = tm->ntm_dom;
73  struct m0_net_transfer_mc *dest_tm = NULL;
74  struct m0_net_end_point *dest_ep = NULL;
75  struct m0_net_bulk_mem_domain_pvt *dp;
76  struct m0_net_transfer_mc *itm;
77  struct m0_net_bulk_mem_end_point *mep;
78  int rc = 0;
79 
80  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
83 
84  /* iterate over in-mem domains to find the destination TM */
85 
89  xd_dom_linkage) {
90  if (dp->xd_dom == mydom)
91  continue; /* skip self */
92  /* iterate over TM's in domain */
95  struct m0_net_transfer_mc,
96  ntm_dom_linkage) {
97  m0_mutex_lock(&itm->ntm_mutex);
99  do { /* provides break context */
100  if (itm->ntm_state != M0_NET_TM_STARTED)
101  break; /* ignore */
102  if (!mem_eps_are_equal(itm->ntm_ep, match_ep))
103  break;
104 
105  /* Found the matching TM. */
106  dest_tm = itm;
107  if (p_dest_ep == NULL)
108  break;
109  /* We need to create an EP for the local TM
110  address in the remote DOM.
111  We don't need the domain lock (do need TM
112  mutex) but the original logic required it so
113  leave it that way...
114  */
115  mep = mem_ep_to_pvt(tm->ntm_ep);
116  rc = mem_bmo_ep_create(&dest_ep,
117  dest_tm,
118  &mep->xep_sa,
119  mep->xep_service_id);
120  } while(0);
121  if (dest_tm != NULL) {
122  /* found the TM */
123  if (rc != 0) {
124  /* ... but failed on EP */
125  m0_mutex_unlock(&dest_tm->ntm_mutex);
126  dest_tm = NULL;
127  }
128  break;
129  }
130  m0_mutex_unlock(&itm->ntm_mutex);
131  }
133  if (dest_tm != NULL || rc != 0)
134  break;
135  }
136  if (rc == 0 && dest_tm == NULL)
137  rc = -ENETUNREACH; /* search exhausted */
139  M0_ASSERT(rc != 0 ||
140  (dest_tm != NULL &&
141  m0_mutex_is_locked(&dest_tm->ntm_mutex) &&
142  dest_tm->ntm_state == M0_NET_TM_STARTED));
143  if (!rc) {
144  M0_ASSERT(mem_tm_invariant(dest_tm));
145  *p_dest_tm = dest_tm;
146  if (p_dest_ep != NULL) {
147  M0_ASSERT(dest_ep != NULL);
148  *p_dest_ep = dest_ep;
149  }
150  }
151  return M0_RC(rc);
152 }
153 
154 
166 static void mem_wf_msg_send(struct m0_net_transfer_mc *tm,
167  struct m0_net_bulk_mem_work_item *wi)
168 {
169  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
170  int rc;
171  struct m0_net_transfer_mc *dest_tm = NULL;
172  struct m0_net_end_point *dest_ep = NULL;
173  struct m0_net_buffer *dest_nb = NULL;
174 
175  M0_PRE(nb != NULL &&
176  nb->nb_qtype == M0_NET_QT_MSG_SEND &&
177  nb->nb_tm == tm &&
178  nb->nb_ep != NULL);
180 
181  do {
182  bool found_dest_nb = false;
183  struct m0_net_bulk_mem_work_item *dest_wi;
184  struct m0_net_bulk_mem_tm_pvt *dest_tp;
185 
186  /* Search for a remote TM matching the destination address,
187  and if found, create an EP in the remote TM's domain with
188  the local TM's address.
189  */
190  rc = mem_find_remote_tm(tm, nb->nb_ep, &dest_tm, &dest_ep);
191  if (rc != 0)
192  break;
193 
194  /* We're now operating in the destination TM while holding
195  its mutex. The destination TM is operative.
196  */
197 
198  /* get the first available receive buffer */
199  m0_tl_for(m0_net_tm, &dest_tm->ntm_q[M0_NET_QT_MSG_RECV],
200  dest_nb) {
201  if ((dest_nb->nb_flags &
203  found_dest_nb = true;
204  break;
205  }
206  } m0_tl_endfor;
207  if (!found_dest_nb) {
209  ++;
210  rc = -ENOBUFS;
211  mem_post_error(dest_tm, rc);
212  break;
213  }
216  dest_nb->nb_flags |= M0_NET_BUF_IN_USE;
217  dest_wi = mem_buffer_to_wi(dest_nb);
218  if (nb->nb_length > mem_buffer_length(dest_nb)) {
219  rc = -EMSGSIZE;
220  /* set the desired length in the event */
221  dest_wi->xwi_nbe_length = nb->nb_length;
222  } else {
223  rc = mem_copy_buffer(dest_nb, nb, nb->nb_length);
224  dest_wi->xwi_nbe_length = nb->nb_length;
225  }
226  if (rc == 0) {
227  /* commit to using the destination EP */
228  dest_wi->xwi_nbe_ep = dest_ep;
229  dest_ep = NULL; /* do not release below */
230  }
231  dest_wi->xwi_status = rc; /* recv error code */
232 
233  /* schedule the receive msg callback */
234  dest_wi->xwi_op = M0_NET_XOP_MSG_RECV_CB;
235 
236  dest_tp = mem_tm_to_pvt(dest_tm);
237  mem_wi_add(dest_wi, dest_tp);
238  } while (0);
239 
240  /* release the destination TM mutex */
241  if (dest_tm != NULL)
242  m0_mutex_unlock(&dest_tm->ntm_mutex);
243 
244  /* release the destination EP, if still referenced,
245  outside of any mutex
246  */
247  if (dest_ep != NULL)
248  m0_net_end_point_put(dest_ep);
249 
250  /* post the send completion callback (will clear M0_NET_BUF_IN_USE) */
251  wi->xwi_status = rc;
253  return;
254 }
255  /* bulkmem */
257 
258 /*
259  * Local variables:
260  * c-indentation-style: "K&R"
261  * c-basic-offset: 8
262  * tab-width: 8
263  * fill-column: 80
264  * scroll-step: 1
265  * End:
266  */
uint64_t nqs_num_f_events
Definition: net.h:784
static void mem_wf_msg_send(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_msg.c:166
static void mem_wf_msg_recv_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_msg.c:30
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static struct m0_net_buffer * mem_wi_to_buffer(struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt.h:386
#define NULL
Definition: misc.h:38
static bool mem_tm_invariant(const struct m0_net_transfer_mc *tm)
static int mem_copy_buffer(struct m0_net_buffer *dest_nb, struct m0_net_buffer *src_nb, m0_bcount_t num_bytes)
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
static struct m0_list mem_domains
Definition: mem_xprt_xo.c:39
struct m0_net_qstats ntm_qstats[M0_NET_QT_NR]
Definition: net.h:880
enum m0_net_tm_state ntm_state
Definition: net.h:819
static void mem_wi_add(struct m0_net_bulk_mem_work_item *wi, struct m0_net_bulk_mem_tm_pvt *tp)
m0_bcount_t nb_length
Definition: net.h:1334
static void mem_wi_post_buffer_event(struct m0_net_bulk_mem_work_item *wi)
enum m0_net_bulk_mem_work_opcode xwi_op
Definition: mem_xprt.h:163
uint64_t nb_flags
Definition: net.h:1489
struct m0_mutex nd_mutex
Definition: net.h:381
struct m0_net_domain * ntm_dom
Definition: net.h:853
struct m0_mutex m0_net_mutex
Definition: net.c:59
struct m0_net_end_point * xwi_nbe_ep
Definition: mem_xprt.h:179
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static bool mem_eps_are_equal(const struct m0_net_end_point *ep1, const struct m0_net_end_point *ep2)
Definition: mem_xprt_ep.c:179
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
static struct m0_net_bulk_mem_end_point * mem_ep_to_pvt(const struct m0_net_end_point *ep)
Definition: mem_xprt.h:262
static void mem_post_error(struct m0_net_transfer_mc *tm, int status)
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
m0_bcount_t xwi_nbe_length
Definition: mem_xprt.h:176
#define m0_list_for_each_entry(head, pos, type, member)
Definition: list.h:235
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
static int mem_find_remote_tm(struct m0_net_transfer_mc *tm, struct m0_net_end_point *match_ep, struct m0_net_transfer_mc **p_dest_tm, struct m0_net_end_point **p_dest_ep)
Definition: mem_xprt_msg.c:67
static struct m0_net_bulk_mem_work_item * mem_buffer_to_wi(struct m0_net_buffer *buf)
Definition: mem_xprt.h:399
static m0_bcount_t mem_buffer_length(const struct m0_net_buffer *nb)
struct m0_tl ntm_q[M0_NET_QT_NR]
Definition: net.h:877
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
struct m0_net_end_point * ntm_ep
Definition: net.h:868
M0_INTERNAL bool m0_net__tm_invariant(const struct m0_net_transfer_mc *tm)
Definition: tm.c:67
struct sockaddr_in xep_sa
Definition: mem_xprt.h:244
static bool mem_buffer_invariant(const struct m0_net_buffer *nb)
struct m0_net_domain * xd_dom
Definition: mem_xprt.h:335
static struct m0_net_bulk_mem_tm_pvt * mem_tm_to_pvt(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt.h:231
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
int32_t rc
Definition: trigger_fop.h:47
struct m0_list nd_tms
Definition: net.h:390
struct m0_net_end_point * nb_ep
Definition: net.h:1424
static int mem_bmo_ep_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const struct sockaddr_in *sa, uint32_t id)
Definition: mem_xprt_pvt.h:96