Motr  M0
mem_xprt_tm.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 /* This file is included into mem_xprt_xo.c */
24 
35 static void mem_wf_state_change(struct m0_net_transfer_mc *tm,
36  struct m0_net_bulk_mem_work_item *wi)
37 {
38  enum m0_net_bulk_mem_tm_state next_state = wi->xwi_next_state;
39  struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
40  struct m0_net_tm_event ev = {
42  .nte_tm = tm,
43  .nte_status = 0
44  };
45 
46  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
47  M0_ASSERT(next_state == M0_NET_XTM_STARTED ||
48  next_state == M0_NET_XTM_STOPPED);
49 
50  if (next_state == M0_NET_XTM_STARTED) {
51  /*
52  Application can call m0_net_tm_stop -> mem_xo_tm_stop
53  before the M0_NET_XTM_STARTED work item gets processed.
54  If that happens, ignore the M0_NET_XTM_STARTED item.
55  */
56  if (tp->xtm_state < M0_NET_XTM_STOPPING) {
58  if (wi->xwi_status != 0) {
61  ev.nte_status = wi->xwi_status;
62  if (wi->xwi_nbe_ep != NULL) {
63  /* must free ep before posting event */
65  wi->xwi_nbe_ep = NULL;
66  }
67  } else {
68  tp->xtm_state = next_state;
70  ev.nte_ep = wi->xwi_nbe_ep;
71  wi->xwi_nbe_ep = NULL;
72  }
73  m0_mutex_unlock(&tm->ntm_mutex);
74  ev.nte_time = m0_time_now();
76  m0_mutex_lock(&tm->ntm_mutex);
77  }
78  if (wi->xwi_nbe_ep != NULL) {
79  /* free the end point if not consumed */
81  wi->xwi_nbe_ep = NULL;
82  }
83  } else { /* M0_NET_XTM_STOPPED, as per assert */
85  tp->xtm_state = next_state;
87 
88  /* broadcast on cond and wait for work item queue to empty */
90  while (!m0_list_is_empty(&tp->xtm_work_list) ||
91  tp->xtm_callback_counter > 1)
93 
94  m0_mutex_unlock(&tm->ntm_mutex);
95  ev.nte_time = m0_time_now();
97  m0_mutex_lock(&tm->ntm_mutex);
98  }
99 
101  m0_free(wi);
102 }
103 
107 static void mem_wf_cancel_cb(struct m0_net_transfer_mc *tm,
108  struct m0_net_bulk_mem_work_item *wi)
109 {
110  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
111 
112  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
114 
115  /* post the completion callback (will clear operation flags) */
116  if ((nb->nb_flags & M0_NET_BUF_TIMED_OUT) != 0)
117  wi->xwi_status = -ETIMEDOUT;
118  else
119  wi->xwi_status = -ECANCELED;
121  return;
122 }
123 
129 static void mem_wf_error_cb(struct m0_net_transfer_mc *tm,
130  struct m0_net_bulk_mem_work_item *wi)
131 {
132  struct m0_net_tm_event ev = {
134  .nte_tm = tm,
135  .nte_status = wi->xwi_status,
136  };
137 
139  M0_PRE(wi->xwi_status < 0);
140  ev.nte_time = m0_time_now();
142  m0_free(wi);
143 }
144 
151 static void mem_post_error(struct m0_net_transfer_mc *tm, int32_t status)
152 {
153  struct m0_net_bulk_mem_tm_pvt *tp = mem_tm_to_pvt(tm);
154  struct m0_net_bulk_mem_work_item *wi;
155  M0_PRE(status < 0);
156  M0_PRE(m0_mutex_is_locked(&tm->ntm_mutex));
157  M0_ALLOC_PTR(wi);
158  if (wi == NULL)
159  return;
161  wi->xwi_status = status;
162  mem_wi_add(wi, tp);
163 }
164 
165 enum {
171 };
172 
180 static void mem_xo_tm_worker(struct m0_net_transfer_mc *tm)
181 {
182  struct m0_net_bulk_mem_tm_pvt *tp;
183  struct m0_net_bulk_mem_domain_pvt *dp;
184  struct m0_list_link *link;
185  struct m0_net_bulk_mem_work_item *wi;
188  bool rc;
189 
190  m0_mutex_lock(&tm->ntm_mutex);
192  tp = mem_tm_to_pvt(tm);
193  dp = mem_dom_to_pvt(tm->ntm_dom);
194 
195  while (1) {
196  while (!m0_list_is_empty(&tp->xtm_work_list)) {
197  link = m0_list_first(&tp->xtm_work_list);
198  wi = m0_list_entry(link,
200  xwi_link);
201  m0_list_del(&wi->xwi_link);
202  fn = dp->xd_ops->bmo_work_fn[wi->xwi_op];
203  M0_ASSERT(fn != NULL);
204 
205  tp->xtm_callback_counter++;
206  if (wi->xwi_op == M0_NET_XOP_STATE_CHANGE) {
207  fn(tm, wi);
208  } else {
209  /* others expect mutex to be released
210  and the M0_NET_BUF_IN_USE flag set
211  if a buffer is involved.
212  */
213  if (wi->xwi_op != M0_NET_XOP_ERROR_CB) {
214  struct m0_net_buffer *nb =
215  mem_wi_to_buffer(wi);
217  }
218  m0_mutex_unlock(&tm->ntm_mutex);
219  fn(tm, wi);
220  m0_mutex_lock(&tm->ntm_mutex);
222  }
223  tp->xtm_callback_counter--;
224  /* signal that wi was removed from queue */
226  }
227  if (tp->xtm_state == M0_NET_XTM_STOPPED)
228  break;
229  do {
232  } while (!rc);
233  }
234 
235  m0_mutex_unlock(&tm->ntm_mutex);
236 }
237 
242 /*
243  * Local variables:
244  * c-indentation-style: "K&R"
245  * c-basic-offset: 8
246  * tab-width: 8
247  * fill-column: 80
248  * scroll-step: 1
249  * End:
250  */
struct m0_list_link xwi_link
Definition: mem_xprt.h:158
M0_INTERNAL void m0_list_link_fini(struct m0_list_link *link)
Definition: list.c:176
#define M0_PRE(cond)
struct m0_cond xtm_work_list_cv
Definition: mem_xprt.h:218
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
enum m0_net_bulk_mem_tm_state xtm_state
Definition: mem_xprt.h:214
static struct m0_net_buffer * mem_wi_to_buffer(struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt.h:386
#define NULL
Definition: misc.h:38
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
static void mem_wf_cancel_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:107
static void mem_wi_add(struct m0_net_bulk_mem_work_item *wi, struct m0_net_bulk_mem_tm_pvt *tp)
static void mem_wi_post_buffer_event(struct m0_net_bulk_mem_work_item *wi)
enum m0_net_bulk_mem_work_opcode xwi_op
Definition: mem_xprt.h:163
uint64_t nb_flags
Definition: net.h:1489
enum m0_net_bulk_mem_tm_state xwi_next_state
Definition: mem_xprt.h:166
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
Definition: list.c:147
struct m0_net_domain * ntm_dom
Definition: net.h:853
static void mem_wf_state_change(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:35
struct m0_list xtm_work_list
Definition: mem_xprt.h:216
struct m0_net_end_point * xwi_nbe_ep
Definition: mem_xprt.h:179
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
enum m0_net_tm_state nte_next_state
Definition: net.h:723
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
m0_time_t m0_time_now(void)
Definition: time.c:134
M0_INTERNAL void m0_cond_signal(struct m0_cond *cond)
Definition: cond.c:94
static void mem_post_error(struct m0_net_transfer_mc *tm, int32_t status)
Definition: mem_xprt_tm.c:151
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
static void mem_xo_tm_worker(struct m0_net_transfer_mc *tm)
Definition: mem_xprt_tm.c:180
M0_INTERNAL bool m0_list_is_empty(const struct m0_list *head)
Definition: list.c:42
m0_net_bulk_mem_work_fn_t bmo_work_fn[M0_NET_XOP_NR]
Definition: mem_xprt.h:281
struct m0_net_end_point * nte_ep
Definition: net.h:729
struct m0_ref nep_ref
Definition: net.h:491
static uint32_t timeout
Definition: console.c:52
M0_INTERNAL void m0_cond_wait(struct m0_cond *cond)
Definition: cond.c:52
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL bool m0_net__tm_invariant(const struct m0_net_transfer_mc *tm)
Definition: tm.c:67
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
uint32_t xtm_callback_counter
Definition: mem_xprt.h:220
const struct m0_net_bulk_mem_ops * xd_ops
Definition: mem_xprt.h:338
M0_INTERNAL bool m0_cond_timedwait(struct m0_cond *cond, const m0_time_t abs_timeout)
Definition: cond.c:74
M0_INTERNAL void m0_cond_broadcast(struct m0_cond *cond)
Definition: cond.c:100
m0_net_bulk_mem_tm_state
Definition: mem_xprt.h:139
enum m0_net_tm_ev_type nte_type
Definition: net.h:691
m0_time_t nte_time
Definition: net.h:701
static struct m0_list_link * m0_list_first(const struct m0_list *head)
Definition: list.h:191
static struct m0_net_bulk_mem_domain_pvt * mem_dom_to_pvt(const struct m0_net_domain *dom)
Definition: mem_xprt.h:375
static struct m0_net_bulk_mem_tm_pvt * mem_tm_to_pvt(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt.h:231
void m0_free(void *data)
Definition: memory.c:146
int32_t rc
Definition: trigger_fop.h:47
#define m0_list_entry(link, type, member)
Definition: list.h:217
void(* m0_net_bulk_mem_work_fn_t)(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt.h:272
int32_t nte_status
Definition: net.h:715
static void mem_wf_error_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_tm.c:129