Motr  M0
lnet_tm.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
28 static inline bool all_tm_queues_are_empty(struct m0_net_transfer_mc *tm)
29 {
30  int i;
31 
32  for (i = 0; i < ARRAY_SIZE(tm->ntm_q); ++i)
33  if (!m0_net_tm_tlist_is_empty(&tm->ntm_q[i]))
34  return false;
35  return true;
36 }
37 
46 {
47  int qt;
48  struct m0_net_buffer *nb;
49  struct nlx_xo_transfer_mc *tp M0_UNUSED;
50  int i;
51 
52  M0_PRE(tm != NULL && nlx_tm_invariant(tm));
53  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
54 
55  tp = tm->ntm_xprt_private;
56  NLXDBGP(tp, 2, "%p: nlx_tm_timeout_buffers\n", tp);
57  for (i = 0, qt = M0_NET_QT_MSG_RECV; qt < M0_NET_QT_NR; ++qt) {
58  m0_tl_for(m0_net_tm, &tm->ntm_q[qt], nb) {
59  /* nb_timeout set to M0_TIME_NEVER if disabled */
60  if (nb->nb_timeout > now)
61  continue;
63  nlx_xo_buf_del(nb); /* cancel if possible; !dequeued */
64  ++i;
65  } m0_tl_endfor;
66  }
67  return i;
68 }
69 
75 static m0_time_t
77 {
79 }
80 
89 static void nlx_tm_ev_worker(struct m0_net_transfer_mc *tm)
90 {
91  struct nlx_xo_transfer_mc *tp;
92  struct nlx_core_transfer_mc *ctp;
93  struct nlx_xo_domain *dp;
94  struct nlx_core_domain *cd;
95  struct m0_net_tm_event tmev = {
97  .nte_tm = tm,
98  .nte_status = 0
99  };
101  m0_time_t next_buffer_timeout;
102  m0_time_t buffer_timeout_tick;
103  m0_time_t now;
104  int rc = 0;
105 
106  m0_mutex_lock(&tm->ntm_mutex);
108  tp = tm->ntm_xprt_private;
109  ctp = &tp->xtm_core;
110  dp = tm->ntm_dom->nd_xprt_private;
111  cd = &dp->xd_core;
112 
113  nlx_core_tm_set_debug(ctp, tp->_debug_);
114 
115  if (tp->xtm_processors.b_nr != 0) {
118  }
119 
120  if (rc == 0)
121  rc = nlx_core_tm_start(cd, tm, ctp);
122  if (rc == 0) {
123  rc = nlx_ep_create(&tmev.nte_ep, tm, &ctp->ctm_addr);
124  if (rc != 0)
125  nlx_core_tm_stop(cd, ctp);
126  }
127 
128  /*
129  Deliver a M0_NET_TEV_STATE_CHANGE event to transition the TM to
130  the M0_NET_TM_STARTED or M0_NET_TM_FAILED states.
131  Set the transfer machine's end point in the event on success.
132  */
133  if (rc == 0) {
135  } else {
137  tmev.nte_status = rc;
138  }
139  tmev.nte_time = m0_time_now();
140  tm->ntm_ep = NULL;
141  m0_mutex_unlock(&tm->ntm_mutex);
142  m0_net_tm_event_post(&tmev);
143  if (rc != 0)
144  return;
145 
146  m0_mutex_lock(&tm->ntm_mutex);
147  now = m0_time_now();
148  m0_mutex_unlock(&tm->ntm_mutex);
149 
150  buffer_timeout_tick = NLX_tm_get_buffer_timeout_tick(tm);
151  next_buffer_timeout = m0_time_add(now, buffer_timeout_tick);
152 
153  NLXDBGP(tp, 1, "%p: tm_worker_thread started\n", tp);
154 
155  while (1) {
156  /* Compute next timeout (short if automatic or stopping).
157  Upper bound constrained by the next stat schedule time.
158  */
159  if (tm->ntm_bev_auto_deliver ||
163  else
166  if (timeout > next_buffer_timeout)
167  timeout = next_buffer_timeout;
168 
169  if (tm->ntm_bev_auto_deliver) {
170  rc = NLX_core_buf_event_wait(cd, ctp, timeout);
171  /* buffer event processing */
172  if (rc == 0) { /* did not time out - events pending */
173  m0_mutex_lock(&tm->ntm_mutex);
175  m0_mutex_unlock(&tm->ntm_mutex);
176  }
177  } else { /* application initiated delivery */
178  m0_mutex_lock(&tm->ntm_mutex);
179  if (tp->xtm_ev_chan == NULL)
181  if (tp->xtm_ev_chan != NULL) {
182  m0_mutex_unlock(&tm->ntm_mutex);
183  rc = nlx_core_buf_event_wait(cd, ctp, timeout);
184  m0_mutex_lock(&tm->ntm_mutex);
185  if (rc == 0 && tp->xtm_ev_chan != NULL) {
186  if (tp->xtm_ev_chan == &tm->ntm_chan) {
188  } else {
190  tp->xtm_ev_chan);
191  }
192  tp->xtm_ev_chan = NULL;
193  }
194  }
195  m0_mutex_unlock(&tm->ntm_mutex);
196  }
197 
198  /* periodically record statistics and time out buffers */
199  now = m0_time_now();
200  m0_mutex_lock(&tm->ntm_mutex);
201  if (now >= next_buffer_timeout) {
202  NLX_tm_timeout_buffers(tm, now);
203  next_buffer_timeout = m0_time_add(now,
204  buffer_timeout_tick);
205  }
206  m0_mutex_unlock(&tm->ntm_mutex);
207 
208  /* termination processing */
209  if (tm->ntm_state == M0_NET_TM_STOPPING) {
210  bool must_stop = false;
211  m0_mutex_lock(&tm->ntm_mutex);
212  if (all_tm_queues_are_empty(tm) &&
213  tm->ntm_callback_counter == 0) {
214  nlx_core_tm_stop(cd, ctp);
215  must_stop = true;
216  }
217  m0_mutex_unlock(&tm->ntm_mutex);
218  if (must_stop) {
220  tmev.nte_time = m0_time_now();
221  m0_net_tm_event_post(&tmev);
222  break;
223  }
224  }
225  }
226 }
227 
241  struct nlx_core_buffer_event *lcbev,
242  struct m0_net_buffer_event *nbev)
243 {
244  struct nlx_xo_transfer_mc *tp M0_UNUSED;
245  struct m0_net_buffer *nb;
246  int rc = 0;
247 
248  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
250  tp = tm->ntm_xprt_private;
251 
252  /* Recover the transport space network buffer address from the
253  opaque data set during registration.
254  */
255  nb = (struct m0_net_buffer *) lcbev->cbe_buffer_id;
257 
258  M0_SET0(nbev);
259  nbev->nbe_buffer = nb;
260  nbev->nbe_status = lcbev->cbe_status;
261  nbev->nbe_time = m0_time_add(lcbev->cbe_time, nb->nb_add_time);
262  if (!lcbev->cbe_unlinked)
264  else
265  nb->nb_flags &= ~M0_NET_BUF_RETAIN;
266  if (nbev->nbe_status != 0) {
267  if (nbev->nbe_status == -ECANCELED &&
269  nbev->nbe_status = -ETIMEDOUT;
270  goto done; /* this is not an error from this sub */
271  } else
273 
274  if (nb->nb_qtype == M0_NET_QT_MSG_RECV) {
275  rc = NLX_ep_create(&nbev->nbe_ep, tm, &lcbev->cbe_sender);
276  if (rc != 0) {
277  nbev->nbe_status = rc;
278  goto done;
279  }
280  nbev->nbe_offset = lcbev->cbe_offset;
281  }
282  if (nb->nb_qtype == M0_NET_QT_MSG_RECV ||
285  nbev->nbe_length = lcbev->cbe_length;
286  }
287  done:
288  NLXDBG(tp,2,nlx_print_core_buffer_event("bev_to_net_bev: cbev", lcbev));
289  NLXDBG(tp,2,nlx_print_net_buffer_event("bev_to_net_bev: nbev:", nbev));
290  NLXDBG(tp,2,NLXP("bev_to_net_bev: rc=%d\n", rc));
291 
293  !lcbev->cbe_unlinked));
294  /* currently we only support RETAIN for received messages */
296  nb->nb_qtype == M0_NET_QT_MSG_RECV));
297  M0_POST(rc == 0 || rc == -ENOMEM);
299  return M0_RC(rc);
300 }
301  /* LNetXODFS */
303 
304 /*
305  * Local variables:
306  * c-indentation-style: "K&R"
307  * c-basic-offset: 8
308  * tab-width: 8
309  * fill-column: 80
310  * scroll-step: 1
311  * End:
312  */
#define M0_PRE(cond)
#define NLX_tm_timeout_buffers(tm, now)
Definition: lnet_xo.c:100
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static int nlx_tm_timeout_buffers(struct m0_net_transfer_mc *tm, m0_time_t now)
Definition: lnet_tm.c:45
#define NLX_ep_create(epp, tm, cepa)
Definition: lnet_xo.c:96
#define NULL
Definition: misc.h:38
M0_INTERNAL int m0_thread_confine(struct m0_thread *q, const struct m0_bitmap *processors)
Definition: kthread.c:197
#define ergo(a, b)
Definition: misc.h:293
struct m0_bitmap xtm_processors
Definition: lnet_xo.h:90
uint64_t m0_time_t
Definition: time.h:37
#define NLXDBG(ptr, dbg, stmt)
Definition: lnet_main.c:877
enum m0_net_tm_state ntm_state
Definition: net.h:819
static m0_time_t nlx_tm_get_buffer_timeout_tick(const struct m0_net_transfer_mc *tm)
Definition: lnet_tm.c:76
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
m0_time_t nbe_time
Definition: net.h:1197
m0_bindex_t nbe_offset
Definition: net.h:1238
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
#define NLXP(fmt,...)
Definition: lnet_main.c:876
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
bool ntm_bev_auto_deliver
Definition: net.h:891
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
struct m0_chan ntm_chan
Definition: net.h:874
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
int i
Definition: dir.c:1033
M0_INTERNAL bool m0_net__buffer_event_invariant(const struct m0_net_buffer_event *ev)
Definition: buf.c:297
struct nlx_core_ep_addr ctm_addr
int32_t nbe_status
Definition: net.h:1218
enum m0_net_tm_state nte_next_state
Definition: net.h:723
static bool all_tm_queues_are_empty(struct m0_net_transfer_mc *tm)
Definition: lnet_tm.c:28
void * ntm_xprt_private
Definition: net.h:886
static int nlx_xo_core_bev_to_net_bev(struct m0_net_transfer_mc *tm, struct nlx_core_buffer_event *lcbev, struct m0_net_buffer_event *nbev)
Definition: lnet_tm.c:240
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_INTERNAL void nlx_core_tm_set_debug(struct nlx_core_transfer_mc *lctm, unsigned dbg)
Definition: lnet_core.c:453
void * nd_xprt_private
Definition: net.h:393
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
static void nlx_tm_ev_worker(struct m0_net_transfer_mc *tm)
Definition: lnet_tm.c:89
m0_time_t m0_time_now(void)
Definition: time.c:134
nlx_core_opaque_ptr_t cbe_buffer_id
struct m0_chan * xtm_ev_chan
Definition: lnet_xo.h:101
struct nlx_core_domain xd_core
Definition: lnet_xo.h:77
unsigned _debug_
Definition: lnet_xo.h:106
m0_time_t nb_add_time
Definition: net.h:1394
M0_INTERNAL void nlx_core_tm_stop(struct nlx_core_domain *cd, struct nlx_core_transfer_mc *ctm)
Definition: klnet_core.c:1834
uint32_t ntm_callback_counter
Definition: net.h:850
static void nlx_xo_bev_deliver_all(struct m0_net_transfer_mc *tm)
Definition: lnet_xo.c:480
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
static bool nlx_tm_invariant(const struct m0_net_transfer_mc *tm)
#define M0_POST(cond)
struct m0_cond xtm_ev_cond
Definition: lnet_xo.h:98
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
static int nlx_ep_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const struct nlx_core_ep_addr *cepa)
Definition: lnet_ep.c:48
struct nlx_core_ep_addr cbe_sender
struct m0_tl ntm_q[M0_NET_QT_NR]
Definition: net.h:877
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
struct m0_net_end_point * nte_ep
Definition: net.h:729
static uint32_t timeout
Definition: console.c:52
struct m0_thread xtm_ev_thread
Definition: lnet_xo.h:93
M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
Definition: buf.c:46
static void nlx_xo_buf_del(struct m0_net_buffer *nb)
Definition: lnet_xo.c:379
#define NLX_tm_get_buffer_timeout_tick(tm)
Definition: lnet_xo.c:98
Definition: queue.c:27
struct m0_net_end_point * ntm_ep
Definition: net.h:868
#define NLXDBGP(ptr, dbg, fmt,...)
Definition: lnet_main.c:879
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL struct m0_thread * m0_thread_self(void)
Definition: thread.c:122
#define NLX_core_buf_event_wait(lcdom, lctm, timeout)
Definition: lnet_xo.c:94
M0_INTERNAL int nlx_core_buf_event_wait(struct nlx_core_domain *cd, struct nlx_core_transfer_mc *ctm, m0_time_t timeout)
Definition: klnet_core.c:1685
M0_INTERNAL bool m0_cond_timedwait(struct m0_cond *cond, const m0_time_t abs_timeout)
Definition: cond.c:74
static unsigned done
Definition: storage.c:91
enum m0_net_tm_ev_type nte_type
Definition: net.h:691
m0_time_t nte_time
Definition: net.h:701
size_t b_nr
Definition: bitmap.h:44
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL int nlx_core_tm_start(struct nlx_core_domain *cd, struct m0_net_transfer_mc *tm, struct nlx_core_transfer_mc *ctm)
Definition: klnet_core.c:1942
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
int32_t nte_status
Definition: net.h:715
struct nlx_core_transfer_mc xtm_core
Definition: lnet_xo.h:104
#define M0_UNUSED
Definition: misc.h:380