Motr  M0
chan.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2011-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/thread.h"
24 #include "lib/errno.h"
25 #include "lib/chan.h"
26 #include "lib/assert.h"
27 #include "addb2/addb2.h"
28 #include "addb2/counter.h"
29 #include "motr/magic.h"
30 #include "lib/arith.h" /* M0_CNT_DEC */
31 
58 M0_TL_DESCR_DEFINE(clink, "chan clinks", static, struct m0_clink, cl_linkage,
60 
61 M0_TL_DEFINE(clink, static, struct m0_clink);
62 
63 static bool clink_is_head(const struct m0_clink *clink)
64 {
65  return clink->cl_group == clink;
66 }
67 
68 M0_INTERNAL void m0_chan_lock(struct m0_chan *ch)
69 {
70  m0_mutex_lock(ch->ch_guard);
71 }
72 
73 M0_INTERNAL void m0_chan_unlock(struct m0_chan *ch)
74 {
75  m0_mutex_unlock(ch->ch_guard);
76 }
77 
78 M0_INTERNAL bool m0_chan_is_locked(const struct m0_chan *ch)
79 {
80  return m0_mutex_is_locked(ch->ch_guard);
81 }
82 
87 static bool m0_chan_invariant(struct m0_chan *chan)
88 {
89  return chan->ch_waiters == clink_tlist_length(&chan->ch_links) &&
91  scan->cl_chan == chan &&
92  scan->cl_group != NULL &&
93  clink_is_head(scan->cl_group));
94 }
95 
96 M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
97 {
98  M0_PRE(ch_guard != NULL);
99  *chan = (struct m0_chan){ .ch_guard = ch_guard };
100  clink_tlist_init(&chan->ch_links);
101 }
102 M0_EXPORTED(m0_chan_init);
103 
104 M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
105 {
107  M0_ASSERT(chan->ch_waiters == 0);
108  clink_tlist_fini(&chan->ch_links);
109 }
110 M0_EXPORTED(m0_chan_fini);
111 
112 M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
113 {
114  /*
115  * This seemingly useless lock-unlock pair is to synchronize with
116  * m0_chan_{signal,broadcast}() that might be still using chan.
117  */
121 }
122 M0_EXPORTED(m0_chan_fini_lock);
123 
124 static void clink_signal(struct m0_clink *clink)
125 {
126  struct m0_clink *grp = clink->cl_group;
127  struct m0_chan_addb2 *ca = clink->cl_chan->ch_addb2;
128  bool consumed = false;
129 
130  if (clink->cl_is_oneshot)
132  if (clink->cl_cb != NULL) {
134  if (ca == NULL)
135  consumed = clink->cl_cb(clink);
136  else
137  M0_ADDB2_HIST(ca->ca_cb, &ca->ca_cb_hist,
139  consumed = clink->cl_cb(clink));
140  m0_exit_awkward();
141  }
142  if (!consumed)
143  m0_semaphore_up(&grp->cl_wait);
144 }
145 
146 static void chan_signal_nr(struct m0_chan *chan, uint32_t nr)
147 {
148  struct m0_clink *clink;
149 
151  while (nr-- > 0 &&
152  (clink = clink_tlist_head(&chan->ch_links)) != NULL) {
153  clink_tlist_move_tail(&chan->ch_links, clink);
155  }
157 }
158 
159 M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
160 {
161  chan_signal_nr(chan, 1);
162 }
163 M0_EXPORTED(m0_chan_signal);
164 
165 M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
166 {
170 }
171 
172 M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
173 {
175 }
176 M0_EXPORTED(m0_chan_broadcast);
177 
178 M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
179 {
183 }
184 
185 M0_INTERNAL bool m0_chan_has_waiters(struct m0_chan *chan)
186 {
187  return chan->ch_waiters > 0;
188 }
189 
190 static void clink_init(struct m0_clink *link,
191  struct m0_clink *group, m0_chan_cb_t cb)
192 {
193  link->cl_group = group;
194  link->cl_chan = NULL;
195  link->cl_cb = cb;
196  link->cl_is_oneshot = false;
197  clink_tlink_init(link);
199 }
200 
201 M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
202 {
203  clink_init(link, link, cb);
204  /* do NOT initialise the semaphore here */
205 }
206 M0_EXPORTED(m0_clink_init);
207 
208 M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
209 {
210  /* do NOT finalise the semaphore here */
211  clink_tlink_fini(link);
212 }
213 M0_EXPORTED(m0_clink_fini);
214 
215 M0_INTERNAL void m0_clink_attach(struct m0_clink *link,
216  struct m0_clink *group, m0_chan_cb_t cb)
217 {
219 
220  clink_init(link, group, cb);
221 }
222 M0_EXPORTED(m0_clink_attach);
223 
228 M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
229 {
230  int rc;
231 
233  M0_PRE(!m0_clink_is_armed(link));
234  /* head is registered first */
236 
237  link->cl_chan = chan;
238  if (clink_is_head(link)) {
239  rc = m0_semaphore_init(&link->cl_wait, 0);
240  M0_ASSERT(rc == 0);
241  }
242 
245  clink_tlist_add_tail(&chan->ch_links, link);
246  if (chan->ch_addb2 != NULL)
248  chan->ch_waiters);
250 
251  M0_POST(m0_clink_is_armed(link));
252 }
253 M0_EXPORTED(m0_clink_add);
254 
255 void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
256 {
258  m0_clink_add(chan, link);
260 }
261 M0_EXPORTED(m0_clink_add_lock);
262 
267 M0_INTERNAL void m0_clink_del(struct m0_clink *link)
268 {
269  struct m0_chan *chan = link->cl_chan;
270 
271  M0_PRE(m0_clink_is_armed(link));
273  /* head is de-registered last */
275 
278  clink_tlist_del(link);
279  if (chan->ch_addb2 != NULL)
281  chan->ch_waiters);
283  if (clink_is_head(link))
284  m0_semaphore_fini(&link->cl_wait);
285  /*
286  * Do not zero link->cl_chan: for one-short clinks channel should be
287  * still valid at the time of m0_chan_wait() call.
288  */
289  M0_POST(!m0_clink_is_armed(link));
290 }
291 M0_EXPORTED(m0_clink_del);
292 
293 M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
294 {
295  struct m0_chan *chan = link->cl_chan;
296 
298  m0_clink_del(link);
300 }
301 M0_EXPORTED(m0_clink_del_lock);
302 
303 M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
304 {
305  return link->cl_chan != NULL &&
306  link->cl_linkage.t_link.ll_next != NULL &&
308 }
309 
310 M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
311 {
312  if (link->cl_chan != NULL) {
313  m0_chan_lock(link->cl_chan);
315  m0_chan_unlock(link->cl_chan);
316  }
317 }
318 
319 M0_INTERNAL void m0_clink_cleanup_locked(struct m0_clink *link)
320 {
322  if (m0_clink_is_armed(link))
323  m0_clink_del(link);
324 }
325 
326 M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
327 {
329 }
330 
331 M0_INTERNAL bool m0_chan_trywait(struct m0_clink *link)
332 {
333  return m0_semaphore_trydown(&link->cl_group->cl_wait);
334 }
335 
336 M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
337 {
338  struct m0_chan_addb2 *ca = link->cl_chan->ch_addb2;
339 
340  if (ca == NULL)
342  else
344  m0_ptr_wrap(__builtin_return_address(0)),
346 }
347 M0_EXPORTED(m0_chan_wait);
348 
349 M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link,
350  const m0_time_t abs_timeout)
351 {
352  struct m0_chan_addb2 *ca = link->cl_chan->ch_addb2;
353  bool got;
354 
355  if (ca == NULL)
357  abs_timeout);
358  else
360  m0_ptr_wrap(__builtin_return_address(0)),
362  abs_timeout));
363  return got;
364 }
365 M0_EXPORTED(m0_chan_timedwait);
366 
369 /*
370  * Local variables:
371  * c-indentation-style: "K&R"
372  * c-basic-offset: 8
373  * tab-width: 8
374  * fill-column: 80
375  * scroll-step: 1
376  * End:
377  */
M0_INTERNAL void m0_enter_awkward(void)
Definition: kthread.c:249
uint64_t ca_wait
Definition: chan.h:450
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
M0_INTERNAL bool m0_chan_is_locked(const struct m0_chan *ch)
Definition: chan.c:78
M0_TL_DEFINE(clink, static, struct m0_clink)
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
static void chan_signal_nr(struct m0_chan *chan, uint32_t nr)
Definition: chan.c:146
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_addb2_hist ca_queue_hist
Definition: chan.h:454
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL bool m0_semaphore_trydown(struct m0_semaphore *semaphore)
Definition: semaphore.c:60
M0_INTERNAL bool m0_chan_has_waiters(struct m0_chan *chan)
Definition: chan.c:185
static struct m0_sm_group * grp
Definition: bytecount.c:38
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_semaphore_timeddown(struct m0_semaphore *semaphore, const m0_time_t abs_timeout)
Definition: semaphore.c:75
static void clink_signal(struct m0_clink *clink)
Definition: chan.c:124
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
void m0_addb2_hist_mod(struct m0_addb2_hist *hist, int64_t val)
Definition: histogram.c:68
M0_INTERNAL void m0_chan_lock(struct m0_chan *ch)
Definition: chan.c:68
struct m0_tl ch_links
Definition: chan.h:233
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
#define M0_ASSERT_EX(cond)
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
struct m0_chan_addb2 * ch_addb2
Definition: chan.h:237
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:215
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
static bool m0_chan_invariant(struct m0_chan *chan)
Definition: chan.c:87
M0_INTERNAL void m0_clink_cleanup_locked(struct m0_clink *link)
Definition: chan.c:319
M0_INTERNAL bool m0_chan_trywait(struct m0_clink *link)
Definition: chan.c:331
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
#define M0_POST(cond)
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
Definition: chan.c:310
Definition: chan.h:229
static void group(void)
Definition: sm.c:386
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL void m0_chan_unlock(struct m0_chan *ch)
Definition: chan.c:73
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
bool(* m0_chan_cb_t)(struct m0_clink *link)
Definition: chan.h:176
M0_INTERNAL bool m0_list_link_is_in(const struct m0_list_link *link)
Definition: list.c:181
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
static void clink_init(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:190
uint32_t ch_waiters
Definition: chan.h:236
#define M0_CNT_INC(cnt)
Definition: arith.h:226
static struct m0_chan chan[RDWR_REQUEST_MAX]
struct m0_list_link t_link
Definition: tlist.h:266
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
Definition: chan.c:349
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
uint64_t ca_cb
Definition: chan.h:451
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
struct m0_addb2_hist ca_wait_hist
Definition: chan.h:452
M0_INTERNAL uint64_t m0_ptr_wrap(const void *p)
Definition: misc.c:277
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
Definition: chan.c:104
static bool clink_is_head(const struct m0_clink *clink)
Definition: chan.c:63
M0_INTERNAL void m0_exit_awkward(void)
Definition: kthread.c:262
struct m0_addb2_hist ca_cb_hist
Definition: chan.h:453
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
M0_TL_DESCR_DEFINE(clink, "chan clinks", static, struct m0_clink, cl_linkage, cl_magic, M0_LIB_CHAN_MAGIC, M0_LIB_CHAN_HEAD_MAGIC)
#define M0_PRE_EX(cond)
static int scan(struct scanner *s)
Definition: beck.c:963
#define M0_ADDB2_HIST(id, hist, datum,...)
Definition: histogram.h:147
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
Definition: mutex.h:47
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
#define M0_POST_EX(cond)
struct m0_mutex * ch_guard
Definition: chan.h:231
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735