Motr  M0
buf.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/arith.h" /* max_check */
24 #include "lib/assert.h"
25 #include "lib/errno.h"
26 #include "lib/time.h"
27 #include "lib/misc.h"
28 #include "lib/finject.h"
29 #include "motr/magic.h"
30 #include "net/net_internal.h"
31 #include "net/addb2.h"
32 
33 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET
34 #include "lib/trace.h"
35 
42 {
43  return qt >= M0_NET_QT_MSG_RECV && qt < M0_NET_QT_NR;
44 }
45 
46 M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
47 {
48  return
49  _0C(buf != NULL) &&
50  _0C((buf->nb_flags & M0_NET_BUF_REGISTERED) != 0) &&
51  _0C(buf->nb_dom != NULL) &&
52  _0C(buf->nb_dom->nd_xprt != NULL) &&
53  _0C(buf->nb_buffer.ov_buf != NULL) &&
54  _0C(m0_vec_count(&buf->nb_buffer.ov_vec) != 0) &&
55  ergo((buf->nb_flags & M0_NET_BUF_QUEUED) != 0,
56  _0C(m0_net__qtype_is_valid(buf->nb_qtype)) &&
57  _0C(buf->nb_callbacks != NULL) &&
58  _0C(buf->nb_callbacks->nbc_cb[buf->nb_qtype] != NULL) &&
59  _0C(buf->nb_tm != NULL) &&
60  _0C(buf->nb_dom == buf->nb_tm->ntm_dom) &&
61  M0_CHECK_EX(_0C(m0_net_tm_tlist_contains( /* expensive */
62  &buf->nb_tm->ntm_q[buf->nb_qtype], buf))));
63 }
64 
65 M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf,
66  struct m0_net_domain *dom)
67 {
68  int rc;
69 
70  M0_PRE(dom != NULL);
71  M0_PRE(dom->nd_xprt != NULL);
72 
73  if (M0_FI_ENABLED("fake_error"))
74  return M0_ERR(-EINVAL);
75 
76  m0_mutex_lock(&dom->nd_mutex);
77 
78  M0_PRE_EX(buf != NULL &&
79  buf->nb_dom == NULL &&
80  buf->nb_flags == 0 &&
81  buf->nb_buffer.ov_buf != NULL &&
82  m0_vec_count(&buf->nb_buffer.ov_vec) > 0);
83 
84  buf->nb_dom = dom;
85  buf->nb_xprt_private = NULL;
86  buf->nb_timeout = M0_TIME_NEVER;
87  buf->nb_magic = M0_NET_BUFFER_LINK_MAGIC;
88 
89  /*
90  * The transport will validate buffer size and number of segments, and
91  * optimize it for future use.
92  */
93  rc = dom->nd_xprt->nx_ops->xo_buf_register(buf);
94  if (rc == 0) {
95  buf->nb_flags |= M0_NET_BUF_REGISTERED;
96  m0_list_add_tail(&dom->nd_registered_bufs,&buf->nb_dom_linkage);
97  }
98 
100  M0_POST(ergo(rc == 0, buf->nb_timeout == M0_TIME_NEVER));
101 
102  m0_mutex_unlock(&dom->nd_mutex);
103  return M0_RC(rc);
104 }
105 M0_EXPORTED(m0_net_buffer_register);
106 
107 M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf,
108  struct m0_net_domain *dom)
109 {
110  M0_PRE(dom != NULL);
111  M0_PRE(dom->nd_xprt != NULL);
112  m0_mutex_lock(&dom->nd_mutex);
113 
114  M0_PRE_EX(m0_net__buffer_invariant(buf) && buf->nb_dom == dom);
115  M0_PRE(buf->nb_flags == M0_NET_BUF_REGISTERED);
116  M0_PRE_EX(m0_list_contains(&dom->nd_registered_bufs,
117  &buf->nb_dom_linkage));
118 
119  dom->nd_xprt->nx_ops->xo_buf_deregister(buf);
120  buf->nb_flags &= ~M0_NET_BUF_REGISTERED;
121  m0_list_del(&buf->nb_dom_linkage);
122  buf->nb_xprt_private = NULL;
123  buf->nb_magic = 0;
124  buf->nb_dom = NULL;
125 
126  m0_mutex_unlock(&dom->nd_mutex);
127 }
128 M0_EXPORTED(m0_net_buffer_deregister);
129 
130 M0_INTERNAL int m0_net__buffer_add(struct m0_net_buffer *buf,
131  struct m0_net_transfer_mc *tm)
132 {
133  int rc;
134  struct m0_net_domain *dom;
135  struct m0_tl *ql;
136  struct buf_add_checks {
137  bool check_length;
138  bool check_ep;
139  bool check_desc;
140  bool post_check_desc;
141  };
142  static const struct buf_add_checks checks[M0_NET_QT_NR] = {
143  [M0_NET_QT_MSG_RECV] = { false, false, false, false },
144  [M0_NET_QT_MSG_SEND] = { true, true, false, false },
145  [M0_NET_QT_PASSIVE_BULK_RECV] = { false, false, false, true },
146  [M0_NET_QT_PASSIVE_BULK_SEND] = { true, false, false, true },
147  [M0_NET_QT_ACTIVE_BULK_RECV] = { false, false, true, false },
148  [M0_NET_QT_ACTIVE_BULK_SEND] = { true, false, true, false }
149  };
150  const struct buf_add_checks *todo;
151  m0_bcount_t count = m0_vec_count(&buf->nb_buffer.ov_vec);
152  m0_time_t now = m0_time_now();
153 
154  M0_PRE(tm != NULL);
155  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
158  M0_PRE(buf->nb_dom == tm->ntm_dom);
159 
160  dom = tm->ntm_dom;
161  M0_PRE(dom->nd_xprt != NULL);
162 
163  M0_PRE(!(buf->nb_flags &
166  M0_PRE(ergo(buf->nb_qtype == M0_NET_QT_MSG_RECV,
167  buf->nb_ep == NULL &&
168  buf->nb_min_receive_size != 0 &&
169  buf->nb_min_receive_size <= count &&
170  buf->nb_max_receive_msgs != 0));
172 
173  /* determine what to do by queue type */
174  todo = &checks[buf->nb_qtype];
175  ql = &tm->ntm_q[buf->nb_qtype];
176 
177  /*
178  * Validate that the length is set and is within buffer bounds. The
179  * transport will make other checks on the buffer, such as the max size
180  * and number of segments.
181  */
182  M0_PRE(ergo(todo->check_length, buf->nb_length > 0 &&
183  (buf->nb_length + buf->nb_offset) <= count));
184 
185  /* validate end point usage; increment ref count later */
186  M0_PRE(ergo(todo->check_ep,
187  buf->nb_ep != NULL &&
188  m0_net__ep_invariant(buf->nb_ep, tm, true)));
189 
190  /* validate that the descriptor is present */
191  if (todo->post_check_desc) {
193  buf->nb_desc.nbd_len = 0;
194  buf->nb_desc.nbd_data = NULL;
195  }
196  M0_PRE(ergo(todo->check_desc,
197  buf->nb_desc.nbd_len > 0 &&
198  buf->nb_desc.nbd_data != NULL));
199 
200  /* validate that a timeout, if set, is in the future */
201  if (buf->nb_timeout != M0_TIME_NEVER) {
202  /* Don't want to assert here as scheduling is unpredictable. */
203  if (now >= buf->nb_timeout) {
204  rc = -ETIME; /* not -ETIMEDOUT */
205  goto m_err_exit;
206  }
207  }
208 
209  /*
210  * Optimistically add it to the queue's list before calling the xprt.
211  * Post will unlink on completion, or del on cancel.
212  */
213  m0_net_tm_tlink_init_at_tail(buf, ql);
214  buf->nb_flags |= M0_NET_BUF_QUEUED;
215  buf->nb_add_time = now; /* record time added */
216  buf->nb_msgs_received = 0;
217 
218  /* call the transport */
219  buf->nb_tm = tm;
220  rc = dom->nd_xprt->nx_ops->xo_buf_add(buf);
221  if (rc != 0) {
222  m0_net_tm_tlink_del_fini(buf);
223  buf->nb_flags &= ~M0_NET_BUF_QUEUED;
224  goto m_err_exit;
225  }
226 
227  tm->ntm_qstats[buf->nb_qtype].nqs_num_adds += 1;
228 
229  if (todo->check_ep) {
230  /* Bump the reference count.
231  Should be decremented in m0_net_buffer_event_post().
232  The caller holds a reference to the end point.
233  */
234  m0_net_end_point_get(buf->nb_ep);
235  }
236 
237  M0_POST(ergo(todo->post_check_desc,
238  buf->nb_desc.nbd_len != 0 &&
239  buf->nb_desc.nbd_data != NULL));
242 
243  m_err_exit:
244  return M0_RC(rc);
245 }
246 
247 M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf,
248  struct m0_net_transfer_mc *tm)
249 {
250  int rc;
251  M0_PRE(tm != NULL);
252  if (M0_FI_ENABLED("fake_error"))
253  return M0_ERR(-EMSGSIZE);
254  m0_mutex_lock(&tm->ntm_mutex);
255  rc = m0_net__buffer_add(buf, tm);
256  m0_mutex_unlock(&tm->ntm_mutex);
257  return M0_RC(rc);
258 }
259 M0_EXPORTED(m0_net_buffer_add);
260 
261 M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf,
262  struct m0_net_transfer_mc *tm)
263 {
264  struct m0_net_domain *dom;
265  bool rc = true;
266 
267  M0_PRE(tm != NULL && tm->ntm_dom != NULL);
268  M0_PRE(buf != NULL);
269 
270  dom = tm->ntm_dom;
271  M0_PRE(dom->nd_xprt != NULL);
272  m0_mutex_lock(&tm->ntm_mutex);
273 
275  M0_PRE(buf->nb_tm == NULL || buf->nb_tm == tm);
276 
277  if (!(buf->nb_flags & M0_NET_BUF_QUEUED)) {
278  /* completion race condition? no error */
279  rc = false;
280  goto m_err_exit;
281  }
282 
283  /* tell the transport to cancel */
284  dom->nd_xprt->nx_ops->xo_buf_del(buf);
285 
286  tm->ntm_qstats[buf->nb_qtype].nqs_num_dels += 1;
287 
290  m_err_exit:
291  m0_mutex_unlock(&tm->ntm_mutex);
292 
293  return rc;
294 }
295 M0_EXPORTED(m0_net_buffer_del);
296 
298  *ev)
299 {
300  int32_t status = ev->nbe_status;
301  uint64_t flags = ev->nbe_buffer->nb_qtype;
302 
303  return
304  _0C(status <= 0) &&
305  _0C(ergo(status == 0 &&
307  /* don't check ep invariant here */
308  ev->nbe_ep != NULL)) &&
309  _0C(ergo(flags & M0_NET_BUF_CANCELLED, status == -ECANCELED)) &&
310  _0C(ergo(flags & M0_NET_BUF_TIMED_OUT, status == -ETIMEDOUT)) &&
311  _0C(ergo(flags & M0_NET_BUF_RETAIN, status == 0));
312 }
313 
314 M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
315 {
316  struct m0_net_buffer *buf = NULL;
317  struct m0_net_end_point *ep;
318  bool check_ep;
319  bool retain;
320  enum m0_net_queue_type qtype = M0_NET_QT_NR;
321  struct m0_net_transfer_mc *tm;
322  struct m0_net_qstats *q;
323  m0_time_t tdiff;
324  m0_time_t addtime;
326  m0_bcount_t len = 0;
327  struct m0_net_buffer_pool *pool = NULL;
328  uint64_t flags;
329 
331  buf = ev->nbe_buffer;
332  tm = buf->nb_tm;
333  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
334 
335  /*
336  * pre-callback, in mutex:
337  * update buffer (if present), state and statistics
338  */
339  m0_mutex_lock(&tm->ntm_mutex);
340  flags = buf->nb_flags;
344 
345  retain = flags & M0_NET_BUF_RETAIN;
346  if (retain) {
348  } else {
349  m0_net_tm_tlist_del(buf);
352  buf->nb_timeout = M0_TIME_NEVER;
353  }
354 
355  qtype = buf->nb_qtype;
356  q = &tm->ntm_qstats[qtype];
357  if (ev->nbe_status < 0) {
358  q->nqs_num_f_events++;
359  len = 0; /* length not counted on failure */
360  } else {
361  q->nqs_num_s_events++;
362  if (M0_IN(qtype, (M0_NET_QT_MSG_RECV,
365  len = ev->nbe_length;
366  else
367  len = buf->nb_length;
368  }
369  addtime = buf->nb_add_time;
370  tdiff = m0_time_sub(ev->nbe_time, addtime);
371  if (!retain)
372  q->nqs_time_in_queue = m0_time_add(q->nqs_time_in_queue, tdiff);
373  q->nqs_total_bytes += len;
374  q->nqs_max_bytes = max_check(q->nqs_max_bytes, len);
375 
376  ep = NULL;
377  check_ep = false;
378  switch (qtype) {
379  case M0_NET_QT_MSG_RECV:
380  if (ev->nbe_status == 0) {
381  check_ep = true;
382  ep = ev->nbe_ep; /* from event */
383  ++buf->nb_msgs_received;
384  }
385  if (!retain && tm->ntm_state == M0_NET_TM_STARTED &&
386  tm->ntm_recv_pool != NULL)
387  pool = tm->ntm_recv_pool;
388  break;
389  case M0_NET_QT_MSG_SEND:
390  /* must put() ep to match get in buffer_add() */
391  ep = buf->nb_ep; /* from buffer */
392  break;
393  default:
394  break;
395  }
396 
397  if (check_ep) {
398  M0_ASSERT(m0_net__ep_invariant(ep, tm, true));
399  }
400  cb = buf->nb_callbacks->nbc_cb[qtype];
402  buf->nb_flags = flags;
403  m0_mutex_unlock(&tm->ntm_mutex);
404  if (pool != NULL && !retain)
406  cb(ev);
407  M0_ADDB2_ADD(M0_AVI_NET_BUF, (uint64_t)buf, qtype,
408  addtime, tdiff, ev->nbe_status, len);
409  /* Decrement the reference to the ep */
410  if (ep != NULL)
413 }
414 
415 #undef M0_TRACE_SUBSYSTEM
416 
419 /*
420  * Local variables:
421  * c-indentation-style: "K&R"
422  * c-basic-offset: 8
423  * tab-width: 8
424  * fill-column: 79
425  * scroll-step: 1
426  * End:
427  */
uint64_t nqs_num_adds
Definition: net.h:764
#define M0_PRE(cond)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
int const char const void size_t int flags
Definition: dir.c:328
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
#define ergo(a, b)
Definition: misc.h:293
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
uint64_t nqs_num_dels
Definition: net.h:769
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
struct m0_net_qstats ntm_qstats[M0_NET_QT_NR]
Definition: net.h:880
#define max_check(a, b)
Definition: arith.h:95
enum m0_net_tm_state ntm_state
Definition: net.h:819
struct m0_net_domain * ntm_dom
Definition: net.h:853
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
Definition: list.c:147
M0_INTERNAL bool m0_net__ep_invariant(struct m0_net_end_point *ep, struct m0_net_transfer_mc *tm, bool under_tm_mutex)
Definition: ep.c:42
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:261
m0_time_t nbe_time
Definition: net.h:1197
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
void(* m0_net_buffer_cb_proc_t)(const struct m0_net_buffer_event *ev)
Definition: net.h:1259
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
M0_INTERNAL void m0_net__tm_provision_recv_q(struct m0_net_transfer_mc *tm)
Definition: tm_provision.c:509
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
#define M0_CHECK_EX(cond)
return M0_RC(rc)
Definition: sock.c:754
M0_INTERNAL bool m0_net__buffer_event_invariant(const struct m0_net_buffer_event *ev)
Definition: buf.c:297
int32_t nbe_status
Definition: net.h:1218
return M0_ERR(-EOPNOTSUPP)
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL bool m0_net__qtype_is_valid(enum m0_net_queue_type qt)
Definition: buf.c:41
m0_time_t m0_time_now(void)
Definition: time.c:134
Definition: tlist.h:251
static struct m0_stob_domain * dom
Definition: storage.c:38
uint32_t ntm_callback_counter
Definition: net.h:850
M0_INTERNAL bool m0_list_contains(const struct m0_list *list, const struct m0_list_link *link)
Definition: list.c:87
#define M0_POST(cond)
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
struct m0_tl ntm_q[M0_NET_QT_NR]
Definition: net.h:877
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
static struct m0_pool pool
Definition: iter_ut.c:58
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
char * ep
Definition: sw.h:132
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
Definition: buf.c:46
M0_INTERNAL int m0_net__buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:130
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: queue.c:27
m0_net_queue_type
Definition: net.h:591
M0_INTERNAL bool m0_net__tm_invariant(const struct m0_net_transfer_mc *tm)
Definition: tm.c:67
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
M0_INTERNAL void m0_net__tm_post_callback(struct m0_net_transfer_mc *tm)
Definition: tm.c:123
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_list_add_tail(struct m0_list *head, struct m0_list_link *new)
Definition: list.c:119
#define M0_PRE_EX(cond)
int32_t rc
Definition: trigger_fop.h:47
#define M0_POST_EX(cond)
struct m0_net_buffer_pool * ntm_recv_pool
Definition: net.h:896