Motr  M0
queue.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
31 #include "lib/trace.h"
32 
33 #include "be/queue.h"
34 
35 #include "lib/errno.h" /* ENOMEM */
36 #include "lib/memory.h" /* M0_ALLOC_ARR */
37 #include "lib/buf.h" /* m0_buf_memcpy */
38 #include "lib/misc.h" /* ergo */
39 
40 #include "be/op.h" /* m0_be_op_active */
41 
42 
43 struct be_queue_item {
44  uint64_t bqi_magic;
47  char *bqi_data[];
48 };
49 
50 #define BE_QUEUE_ITEM2BUF(bq, bqi) \
51  M0_BUF_INIT((bq)->bq_cfg.bqc_item_length, &(bqi)->bqi_data)
52 
54  struct m0_be_op *bbo_op;
56  struct m0_buf bbo_data;
58  uint64_t bbo_magic;
60 };
61 
62 M0_TL_DESCR_DEFINE(bqq, "m0_be_queue::bq_q*[]", static,
63  struct be_queue_item, bqi_link, bqi_magic,
65 M0_TL_DEFINE(bqq, static, struct be_queue_item);
66 
67 M0_TL_DESCR_DEFINE(bqop, "m0_be_queue::bq_op_*[]", static,
68  struct be_queue_wait_op, bbo_link, bbo_magic,
70 M0_TL_DEFINE(bqop, static, struct be_queue_wait_op);
71 
72 
73 static uint64_t bq_queue_items_max(struct m0_be_queue *bq)
74 {
76 }
77 
78 static struct be_queue_item *be_queue_qitem(struct m0_be_queue *bq,
79  uint64_t index)
80 {
82  return (struct be_queue_item *)
83  (bq->bq_qitems + index *
84  (sizeof(struct be_queue_item) + bq->bq_cfg.bqc_item_length));
85 }
86 
87 static bool be_queue_invariant(struct m0_be_queue *bq)
88 {
90 
91  return bq->bq_enqueued >= bq->bq_dequeued;
92 }
93 
94 M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq,
95  struct m0_be_queue_cfg *cfg)
96 {
97  uint64_t i;
98 
99  M0_ENTRY("bq=%p bqc_q_size_max=%" PRIu64 " "
100  "bqc_producers_nr_max=%" PRIu64 " bqc_consumers_nr_max=%"PRIu64,
101  bq, cfg->bqc_q_size_max,
103  M0_PRE(M0_IS0(bq));
104  M0_PRE(cfg->bqc_q_size_max > 0);
105  M0_PRE(cfg->bqc_producers_nr_max > 0);
106  M0_PRE(cfg->bqc_consumers_nr_max > 0);
107  M0_PRE(cfg->bqc_item_length > 0);
109 
110  bq->bq_cfg = *cfg;
111  bq->bq_the_end = false;
112  bq->bq_enqueued = 0;
113  bq->bq_dequeued = 0;
115  (sizeof(struct be_queue_item) + cfg->bqc_item_length));
118  if (bq->bq_qitems == NULL ||
119  bq->bq_ops_put == NULL ||
120  bq->bq_ops_get == NULL) {
121  m0_free(bq->bq_ops_get);
122  m0_free(bq->bq_ops_put);
123  m0_free(bq->bq_qitems);
124  return M0_ERR(-ENOMEM);
125  }
126  m0_mutex_init(&bq->bq_lock);
127  bqop_tlist_init(&bq->bq_op_put_unused);
128  for (i = 0; i < bq->bq_cfg.bqc_producers_nr_max; ++i) {
129  bqop_tlink_init_at_tail(&bq->bq_ops_put[i],
130  &bq->bq_op_put_unused);
131  }
132  bqop_tlist_init(&bq->bq_op_put);
133  bqop_tlist_init(&bq->bq_op_get_unused);
134  for (i = 0; i < bq->bq_cfg.bqc_consumers_nr_max; ++i) {
135  bqop_tlink_init_at_tail(&bq->bq_ops_get[i],
136  &bq->bq_op_get_unused);
137  }
138  bqop_tlist_init(&bq->bq_op_get);
139  bqq_tlist_init(&bq->bq_q_unused);
140  for (i = 0; i < bq_queue_items_max(bq); ++i) {
141  bqq_tlink_init_at_tail(be_queue_qitem(bq, i),
142  &bq->bq_q_unused);
143  }
144  bqq_tlist_init(&bq->bq_q);
145  return M0_RC(0);
146 }
147 
148 M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
149 {
150  struct be_queue_wait_op *bwo;
151  struct be_queue_item *bqi;
152  uint64_t i;
153 
154  M0_ENTRY("bq="BEQ_F, BEQ_P(bq));
156  "bq="BEQ_F, BEQ_P(bq));
157 
158  m0_tl_for(bqq, &bq->bq_q, bqi) {
159  /*
160  * M0_LOG() couldn't print the item buffer at once,
161  * unfortunately. So let's just at least show the number of
162  * items by printing every item.
163  */
164  M0_LOG(M0_ERROR, "there is an item in the queue");
165  } m0_tl_endfor;
166  bqq_tlist_fini(&bq->bq_q);
167  m0_tl_for(bqop, &bq->bq_op_get, bwo) {
168  M0_LOG(M0_ERROR, "bq=%p bbo_data="BUF_F,
169  bq, BUF_P(&bwo->bbo_data));
170  } m0_tl_endfor;
171  bqop_tlist_fini(&bq->bq_op_get);
172  for (i = 0; i < bq->bq_cfg.bqc_consumers_nr_max; ++i)
173  bqop_tlink_del_fini(&bq->bq_ops_get[i]);
174  bqop_tlist_fini(&bq->bq_op_get_unused);
175  /* if there was nothing in bq_q then the following list is empty */
176  bqop_tlist_fini(&bq->bq_op_put);
177  for (i = 0; i < bq->bq_cfg.bqc_producers_nr_max; ++i)
178  bqop_tlink_del_fini(&bq->bq_ops_put[i]);
179  bqop_tlist_fini(&bq->bq_op_put_unused);
180  for (i = 0; i < bq_queue_items_max(bq); ++i)
181  bqq_tlink_del_fini(be_queue_qitem(bq, i));
182  bqq_tlist_fini(&bq->bq_q_unused);
183  m0_mutex_fini(&bq->bq_lock);
184  m0_free(bq->bq_ops_put);
185  m0_free(bq->bq_ops_get);
186  m0_free(bq->bq_qitems);
187  M0_LEAVE();
188 }
189 
190 M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
191 {
192  m0_mutex_lock(&bq->bq_lock);
193 }
194 
195 M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
196 {
197  m0_mutex_unlock(&bq->bq_lock);
198 }
199 
200 static uint64_t be_queue_items_nr(struct m0_be_queue *bq)
201 {
204  "bq="BEQ_F, BEQ_P(bq));
205  return bq->bq_enqueued - bq->bq_dequeued;
206 }
207 
208 static bool be_queue_is_empty(struct m0_be_queue *bq)
209 {
210  return be_queue_items_nr(bq) == 0;
211 }
212 
213 static bool be_queue_is_full(struct m0_be_queue *bq)
214 {
215  return be_queue_items_nr(bq) >= bq->bq_cfg.bqc_q_size_max;
216 }
217 
218 static struct be_queue_item *be_queue_q_put(struct m0_be_queue *bq,
219  const struct m0_buf *data)
220 {
221  struct be_queue_item *bqi;
222 
224 
225  bqi = bqq_tlist_head(&bq->bq_q_unused);
227  bqq_tlist_move_tail(&bq->bq_q, bqi);
228  ++bq->bq_enqueued;
229  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
230  return bqi;
231 }
232 
233 static void be_queue_q_peek(struct m0_be_queue *bq, struct m0_buf *data)
234 {
235  struct be_queue_item *bqi;
236 
239 
240  bqi = bqq_tlist_head(&bq->bq_q);
242  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
243 }
244 
245 static void be_queue_q_get(struct m0_be_queue *bq,
246  struct m0_buf *data,
247  bool *successful)
248 {
249  struct be_queue_item *bqi;
250 
253 
254  bqi = bqq_tlist_head(&bq->bq_q);
256  *successful = true;
257  bqq_tlist_move(&bq->bq_q_unused, bqi);
258  ++bq->bq_dequeued;
259  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
260 }
261 
262 static void be_queue_op_put(struct m0_be_queue *bq,
263  struct m0_be_op *op,
264  struct be_queue_item *bqi)
265 {
266  struct be_queue_wait_op *bwo;
267 
269  M0_PRE(!bqop_tlist_is_empty(&bq->bq_op_put_unused));
270 
271  bwo = bqop_tlist_head(&bq->bq_op_put_unused);
272  M0_ASSERT_INFO(bwo != NULL,
273  "Too many producers: bqc_producers_nr_max=%"PRIu64,
275  bwo->bbo_bqi = bqi;
276  bwo->bbo_op = op;
277  bqop_tlist_move_tail(&bq->bq_op_put, bwo);
278  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
279 }
280 
281 static void be_queue_op_put_done(struct m0_be_queue *bq)
282 {
283  struct be_queue_wait_op *bwo;
284 
286  M0_PRE(!bqop_tlist_is_empty(&bq->bq_op_put));
287 
288  bwo = bqop_tlist_head(&bq->bq_op_put);
289  m0_be_op_done(bwo->bbo_op);
290  bqop_tlist_move(&bq->bq_op_put_unused, bwo);
291  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
292 }
293 
295 {
296  return !bqop_tlist_is_empty(&bq->bq_op_put);
297 }
298 
299 static void be_queue_op_get(struct m0_be_queue *bq,
300  struct m0_be_op *op,
301  struct m0_buf *data,
302  bool *successful)
303 {
304  struct be_queue_wait_op *bwo;
305 
307  M0_PRE(!bqop_tlist_is_empty(&bq->bq_op_get_unused));
308 
309  bwo = bqop_tlist_head(&bq->bq_op_get_unused);
310  M0_ASSERT_INFO(bwo != NULL,
311  "Too many consumers: bqc_consumers_nr_max=%"PRIu64,
313  bwo->bbo_data = *data;
314  bwo->bbo_successful = successful;
315  bwo->bbo_op = op;
316  bqop_tlist_move_tail(&bq->bq_op_get, bwo);
317  M0_LOG(M0_DEBUG, "bq="BEQ_F, BEQ_P(bq));
318 }
319 
320 static void be_queue_op_get_done(struct m0_be_queue *bq, bool success)
321 {
322  struct be_queue_wait_op *bwo;
323 
324  M0_ENTRY("bq="BEQ_F" success=%d", BEQ_P(bq), !!success);
326  M0_PRE(!bqop_tlist_is_empty(&bq->bq_op_get));
327 
328  bwo = bqop_tlist_head(&bq->bq_op_get);
329  if (success) {
330  be_queue_q_get(bq, &bwo->bbo_data, bwo->bbo_successful);
331  } else {
332  *bwo->bbo_successful = false;
333  }
334  m0_be_op_done(bwo->bbo_op);
335  bqop_tlist_move(&bq->bq_op_get_unused, bwo);
336  M0_LEAVE("bq="BEQ_F, BEQ_P(bq));
337 }
338 
340 {
341  return !bqop_tlist_is_empty(&bq->bq_op_get);
342 }
343 
344 M0_INTERNAL void m0_be_queue_put(struct m0_be_queue *bq,
345  struct m0_be_op *op,
346  const struct m0_buf *data)
347 {
348  struct be_queue_item *bqi;
349  bool was_full;
350 
351  M0_ENTRY("bq="BEQ_F, BEQ_P(bq));
354  M0_PRE(!bq->bq_the_end);
355  M0_PRE(data->b_nob == bq->bq_cfg.bqc_item_length);
356 
358  was_full = be_queue_is_full(bq);
359  bqi = be_queue_q_put(bq, data);
360  if (was_full) {
361  be_queue_op_put(bq, op, bqi);
362  } else {
363  m0_be_op_done(op);
364  }
365  /*
366  * Shortcut for this case hasn't been not done intentionally.
367  * It's much easier to look at the logs when all items are always added
368  * to the queue.
369  */
371  be_queue_op_get_done(bq, true);
372 
374  M0_LEAVE("bq="BEQ_F, BEQ_P(bq));
375 }
376 
377 M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
378 {
379  M0_ENTRY("bq="BEQ_F, BEQ_P(bq));
382  M0_PRE(!bq->bq_the_end);
384 
385  bq->bq_the_end = true;
386  while (be_queue_op_get_is_waiting(bq))
387  be_queue_op_get_done(bq, false);
389  M0_LEAVE("bq="BEQ_F, BEQ_P(bq));
390 }
391 
392 M0_INTERNAL void m0_be_queue_get(struct m0_be_queue *bq,
393  struct m0_be_op *op,
394  struct m0_buf *data,
395  bool *successful)
396 {
399  M0_PRE(data->b_nob == bq->bq_cfg.bqc_item_length);
400 
401  M0_ENTRY("bq="BEQ_F, BEQ_P(bq));
403  if (be_queue_is_empty(bq) && bq->bq_the_end) {
404  *successful = false;
405  m0_be_op_done(op);
406  M0_LEAVE();
407  return;
408  }
410  be_queue_op_get(bq, op, data, successful);
411  M0_LEAVE();
412  return;
413  }
414  be_queue_q_get(bq, data, successful);
415  m0_be_op_done(op);
419  M0_LEAVE("bq="BEQ_F, BEQ_P(bq));
420 }
421 
422 M0_INTERNAL bool m0_be_queue_peek(struct m0_be_queue *bq,
423  struct m0_buf *data)
424 {
427  M0_PRE(data->b_nob == bq->bq_cfg.bqc_item_length);
428 
429  if (be_queue_is_empty(bq) ||
431  M0_LOG(M0_DEBUG, "bq=%p the queue is empty", bq);
432  return false;
433  }
434  be_queue_q_peek(bq, data);
436  M0_LEAVE("bq="BEQ_F, BEQ_P(bq));
437  return true;
438 }
439 
440 #undef BE_QUEUE_ITEM2BUF
441 
442 #undef M0_TRACE_SUBSYSTEM
443 
446 /*
447  * Local variables:
448  * c-indentation-style: "K&R"
449  * c-basic-offset: 8
450  * tab-width: 8
451  * fill-column: 80
452  * scroll-step: 1
453  * End:
454  */
455 /*
456  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
457  */
uint64_t bqc_consumers_nr_max
Definition: queue.h:127
M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
Definition: queue.c:195
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0_buf bbo_data
Definition: queue.c:56
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
#define NULL
Definition: misc.h:38
struct m0_tl bq_op_get
Definition: queue.h:154
#define ergo(a, b)
Definition: misc.h:293
static bool be_queue_op_get_is_waiting(struct m0_be_queue *bq)
Definition: queue.c:339
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static void be_queue_op_put(struct m0_be_queue *bq, struct m0_be_op *op, struct be_queue_item *bqi)
Definition: queue.c:262
uint64_t bq_dequeued
Definition: queue.h:165
#define BEQ_P(bq)
Definition: queue.h:170
uint64_t bbo_magic
Definition: queue.c:58
static uint64_t be_queue_items_nr(struct m0_be_queue *bq)
Definition: queue.c:200
struct m0_bufvec data
Definition: di.c:40
M0_INTERNAL void m0_be_queue_put(struct m0_be_queue *bq, struct m0_be_op *op, const struct m0_buf *data)
Definition: queue.c:344
static bool be_queue_op_put_is_waiting(struct m0_be_queue *bq)
Definition: queue.c:294
struct m0_tl bq_q
Definition: queue.h:136
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static bool be_queue_invariant(struct m0_be_queue *bq)
Definition: queue.c:87
bool bq_the_end
Definition: queue.h:162
M0_INTERNAL void m0_buf_memcpy(struct m0_buf *dst, const struct m0_buf *src)
Definition: buf.c:96
#define m0_tl_endfor
Definition: tlist.h:700
#define BUF_F
Definition: buf.h:75
#define BEQ_F
Definition: queue.h:168
return M0_RC(rc)
op
Definition: libdemo.c:64
struct m0_mutex bq_lock
Definition: queue.h:133
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq, struct m0_be_queue_cfg *cfg)
Definition: queue.c:94
M0_TL_DEFINE(bqq, static, struct be_queue_item)
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
return M0_ERR(-EOPNOTSUPP)
static void be_queue_op_put_done(struct m0_be_queue *bq)
Definition: queue.c:281
#define BE_QUEUE_ITEM2BUF(bq, bqi)
Definition: queue.c:50
static void be_queue_op_get_done(struct m0_be_queue *bq, bool success)
Definition: queue.c:320
static struct be_queue_item * be_queue_q_put(struct m0_be_queue *bq, const struct m0_buf *data)
Definition: queue.c:218
uint64_t bqc_q_size_max
Definition: queue.h:125
M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
Definition: queue.c:148
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_be_queue_get(struct m0_be_queue *bq, struct m0_be_op *op, struct m0_buf *data, bool *successful)
Definition: queue.c:392
M0_INTERNAL bool m0_be_queue_peek(struct m0_be_queue *bq, struct m0_buf *data)
Definition: queue.c:422
struct be_queue_wait_op * bq_ops_put
Definition: queue.h:160
struct m0_tl bq_op_put_unused
Definition: queue.h:153
M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
Definition: queue.c:190
static struct be_queue_item * be_queue_qitem(struct m0_be_queue *bq, uint64_t index)
Definition: queue.c:78
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
struct m0_tlink bqi_link
Definition: queue.c:46
#define M0_POST(cond)
struct m0_be_queue_cfg bq_cfg
Definition: queue.h:132
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
Definition: stubs.c:104
struct m0_tl bq_q_unused
Definition: queue.h:137
struct be_queue_wait_op * bq_ops_get
Definition: queue.h:158
bool * bbo_successful
Definition: queue.c:57
static bool be_queue_is_full(struct m0_be_queue *bq)
Definition: queue.c:213
char * bq_qitems
Definition: queue.h:149
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
Definition: stubs.c:100
m0_bcount_t bqc_item_length
Definition: queue.h:128
#define M0_IS0(obj)
Definition: misc.h:70
struct m0_tlink bbo_link
Definition: queue.c:59
static void be_queue_q_peek(struct m0_be_queue *bq, struct m0_buf *data)
Definition: queue.c:233
#define BUF_P(p)
Definition: buf.h:76
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
uint64_t bqi_magic
Definition: queue.c:44
static void be_queue_op_get(struct m0_be_queue *bq, struct m0_be_op *op, struct m0_buf *data, bool *successful)
Definition: queue.c:299
static bool be_queue_is_empty(struct m0_be_queue *bq)
Definition: queue.c:208
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_be_op * bbo_op
Definition: queue.c:54
#define M0_IS_8ALIGNED(val)
Definition: arith.h:190
Definition: op.h:74
uint64_t bqc_producers_nr_max
Definition: queue.h:126
M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
Definition: queue.c:377
struct m0_tl bq_op_get_unused
Definition: queue.h:155
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
M0_TL_DESCR_DEFINE(bqq, "m0_be_queue::bq_q*[]", static, struct be_queue_item, bqi_link, bqi_magic, M0_BE_QUEUE_Q_MAGIC, M0_BE_QUEUE_Q_HEAD_MAGIC)
static uint64_t bq_queue_items_max(struct m0_be_queue *bq)
Definition: queue.c:73
static void be_queue_q_get(struct m0_be_queue *bq, struct m0_buf *data, bool *successful)
Definition: queue.c:245
char * bqi_data[]
Definition: queue.c:47
struct m0_tl bq_op_put
Definition: queue.h:152
uint64_t bq_enqueued
Definition: queue.h:164
struct be_queue_item * bbo_bqi
Definition: queue.c:55