Motr  M0
lq.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_HA
30 #include "lib/trace.h"
31 
32 #include "ha/lq.h"
33 
34 #include "lib/misc.h" /* M0_IS0 */
35 #include "lib/assert.h" /* M0_PRE */
36 
37 #include "ha/msg.h" /* M0_HA_MSG_TAG_UNKNOWN */
38 
39 static bool ha_lq_tags_invariant(const struct m0_ha_link_tags *tags)
40 {
41  return tags->hlt_confirmed % 2 == tags->hlt_delivered % 2 &&
42  tags->hlt_confirmed % 2 == tags->hlt_next % 2 &&
43  tags->hlt_confirmed % 2 == tags->hlt_assign % 2 &&
44  tags->hlt_confirmed <= tags->hlt_delivered &&
45  tags->hlt_delivered <= tags->hlt_next &&
46  tags->hlt_next <= tags->hlt_assign &&
47  equi(tags->hlt_confirmed == 0, tags->hlt_delivered == 0) &&
48  equi(tags->hlt_confirmed == 0, tags->hlt_next == 0) &&
49  equi(tags->hlt_confirmed == 0, tags->hlt_assign == 0);
50 
51 }
52 
53 M0_INTERNAL bool m0_ha_lq_invariant(const struct m0_ha_lq *lq)
54 {
55  return ha_lq_tags_invariant(&lq->hlq_tags);
56 }
57 
58 M0_INTERNAL void m0_ha_lq_init(struct m0_ha_lq *lq,
59  const struct m0_ha_lq_cfg *lq_cfg)
60 {
61  M0_PRE(M0_IS0(lq));
62  lq->hlq_cfg = *lq_cfg;
65 }
66 
67 M0_INTERNAL void m0_ha_lq_fini(struct m0_ha_lq *lq)
68 {
71  lq->hlq_tags.hlt_delivered) &&
73  lq->hlq_tags.hlt_next) &&
75  lq->hlq_tags.hlt_assign),
76  "lq->hlq_tags="HLTAGS_F, HLTAGS_P(&lq->hlq_tags));
78 }
79 
80 M0_INTERNAL void m0_ha_lq_tags_get(const struct m0_ha_lq *lq,
81  struct m0_ha_link_tags *tags)
82 {
84  *tags = lq->hlq_tags;
85 }
86 
87 M0_INTERNAL void m0_ha_lq_tags_set(struct m0_ha_lq *lq,
88  const struct m0_ha_link_tags *tags)
89 {
91  M0_ENTRY("lq->hlq_tags="HLTAGS_F" tags="HLTAGS_F,
92  HLTAGS_P(&lq->hlq_tags), HLTAGS_P(tags));
93  if (lq->hlq_tags.hlt_confirmed == 0) {
94  lq->hlq_tags = *tags;
95  } else {
96  M0_IMPOSSIBLE("can only set tags just after init()");
97  }
99 }
100 
101 M0_INTERNAL bool m0_ha_lq_has_tag(const struct m0_ha_lq *lq, uint64_t tag)
102 {
104  M0_PRE(lq->hlq_tags.hlt_confirmed > 0);
105 
106  return tag % 2 == lq->hlq_tags.hlt_confirmed % 2;
107 }
108 
109 M0_INTERNAL struct m0_ha_msg *m0_ha_lq_msg(struct m0_ha_lq *lq, uint64_t tag)
110 {
111  struct m0_ha_msg_qitem *qitem;
112 
115  qitem = m0_ha_msg_queue_find(&lq->hlq_mq, tag);
116  return qitem == NULL ? NULL : &qitem->hmq_msg;
117 }
118 
119 M0_INTERNAL struct m0_ha_msg *m0_ha_lq_msg_next(struct m0_ha_lq *lq,
120  const struct m0_ha_msg *cur)
121 {
122  struct m0_ha_msg_qitem *qitem =
124 
126  qitem = m0_ha_msg_queue_next(&lq->hlq_mq, qitem);
127  return qitem == NULL ? NULL : &qitem->hmq_msg;
128 }
129 
130 M0_INTERNAL struct m0_ha_msg *m0_ha_lq_msg_prev(struct m0_ha_lq *lq,
131  const struct m0_ha_msg *cur)
132 {
133  struct m0_ha_msg_qitem *qitem =
135 
137  qitem = m0_ha_msg_queue_prev(&lq->hlq_mq, qitem);
138  return qitem == NULL ? NULL : &qitem->hmq_msg;
139 }
140 
141 M0_INTERNAL bool m0_ha_lq_has_next(const struct m0_ha_lq *lq)
142 {
144 
145  return lq->hlq_tags.hlt_next < lq->hlq_tags.hlt_assign;
146 }
147 
148 M0_INTERNAL bool m0_ha_lq_is_delivered(const struct m0_ha_lq *lq, uint64_t tag)
149 {
152 
153  return tag < lq->hlq_tags.hlt_delivered;
154 }
155 
156 M0_INTERNAL uint64_t m0_ha_lq_tag_assign(const struct m0_ha_lq *lq)
157 {
159  return lq->hlq_tags.hlt_assign;
160 }
161 
162 M0_INTERNAL uint64_t m0_ha_lq_tag_next(const struct m0_ha_lq *lq)
163 {
165  return lq->hlq_tags.hlt_next;
166 }
167 
168 M0_INTERNAL uint64_t m0_ha_lq_tag_delivered(const struct m0_ha_lq *lq)
169 {
171  return lq->hlq_tags.hlt_delivered;
172 }
173 
174 M0_INTERNAL uint64_t m0_ha_lq_tag_confirmed(const struct m0_ha_lq *lq)
175 {
177  return lq->hlq_tags.hlt_confirmed;
178 }
179 
180 M0_INTERNAL uint64_t m0_ha_lq_enqueue(struct m0_ha_lq *lq,
181  const struct m0_ha_msg *msg)
182 {
183  struct m0_ha_msg_qitem *qitem;
184  uint64_t tag;
185 
187  qitem = m0_ha_msg_queue_alloc(&lq->hlq_mq);
188  M0_ASSERT(qitem != NULL); /* XXX */
189  qitem->hmq_msg = *msg;
190  tag = m0_ha_msg_tag(msg);
191  if (tag == M0_HA_MSG_TAG_UNKNOWN) {
192  qitem->hmq_msg.hm_tag = lq->hlq_tags.hlt_assign;
194  } else {
196  }
197  lq->hlq_tags.hlt_assign += 2;
198  m0_ha_msg_queue_enqueue(&lq->hlq_mq, qitem);
200  return m0_ha_msg_tag(&qitem->hmq_msg);
201 }
202 
203 M0_INTERNAL struct m0_ha_msg *m0_ha_lq_next(struct m0_ha_lq *lq)
204 {
205  struct m0_ha_msg *msg;
206 
208  if (lq->hlq_tags.hlt_next == lq->hlq_tags.hlt_assign)
209  return NULL;
210  msg = m0_ha_lq_msg(lq, lq->hlq_tags.hlt_next);
211  lq->hlq_tags.hlt_next += 2;
213  return msg;
214 }
215 
216 M0_INTERNAL bool m0_ha_lq_try_unnext(struct m0_ha_lq *lq)
217 {
218  bool done;
219 
222  if (done)
223  lq->hlq_tags.hlt_next -= 2;
225  return done;
226 }
227 
228 M0_INTERNAL void m0_ha_lq_mark_delivered(struct m0_ha_lq *lq, uint64_t tag)
229 {
230  struct m0_ha_msg *msg;
231  struct m0_ha_msg_qitem *qitem;
232 
234 
235  msg = m0_ha_lq_msg(lq, tag);
236  M0_ASSERT(msg != NULL);
237  M0_ASSERT(lq->hlq_tags.hlt_delivered <= msg->hm_tag);
238  M0_ASSERT(msg->hm_tag <= lq->hlq_tags.hlt_next);
239 
240  qitem = m0_ha_msg_queue_find(&lq->hlq_mq, tag);
242 
244  M0_ASSERT(qitem != NULL);
245  for (; qitem != NULL;
246  qitem = m0_ha_msg_queue_next(&lq->hlq_mq, qitem)) {
247  if (lq->hlq_tags.hlt_delivered == lq->hlq_tags.hlt_next ||
249  break;
250 
251  lq->hlq_tags.hlt_delivered = m0_ha_msg_tag(&qitem->hmq_msg) + 2;
252  }
254 }
255 
256 M0_INTERNAL uint64_t m0_ha_lq_dequeue(struct m0_ha_lq *lq)
257 {
258  struct m0_ha_msg_qitem *qitem;
259  uint64_t tag;
260 
263  qitem = m0_ha_msg_queue_dequeue(&lq->hlq_mq);
264  M0_ASSERT(qitem != NULL);
265  tag = m0_ha_msg_tag(&qitem->hmq_msg);
266  m0_ha_msg_queue_free(&lq->hlq_mq, qitem);
268  lq->hlq_tags.hlt_confirmed += 2;
269  } else {
271  }
273  return tag;
274 }
275 
276 #undef M0_TRACE_SUBSYSTEM
277 
280 /*
281  * Local variables:
282  * c-indentation-style: "K&R"
283  * c-basic-offset: 8
284  * tab-width: 8
285  * fill-column: 80
286  * scroll-step: 1
287  * End:
288  */
289 /*
290  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
291  */
M0_INTERNAL bool m0_ha_lq_has_tag(const struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:101
M0_INTERNAL bool m0_ha_lq_try_unnext(struct m0_ha_lq *lq)
Definition: lq.c:216
#define M0_PRE(cond)
M0_INTERNAL uint64_t m0_ha_lq_enqueue(struct m0_ha_lq *lq, const struct m0_ha_msg *msg)
Definition: lq.c:180
M0_INTERNAL void m0_ha_lq_fini(struct m0_ha_lq *lq)
Definition: lq.c:67
static bool ha_lq_tags_invariant(const struct m0_ha_link_tags *tags)
Definition: lq.c:39
#define NULL
Definition: misc.h:38
M0_INTERNAL struct m0_ha_msg * m0_ha_lq_next(struct m0_ha_lq *lq)
Definition: lq.c:203
M0_INTERNAL void m0_ha_lq_tags_set(struct m0_ha_lq *lq, const struct m0_ha_link_tags *tags)
Definition: lq.c:87
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
struct m0_ha_lq_cfg hlq_cfg
Definition: lq.h:98
M0_INTERNAL uint64_t m0_ha_lq_tag_delivered(const struct m0_ha_lq *lq)
Definition: lq.c:168
struct m0_ha_msg_queue_cfg hlqc_msg_queue_cfg
Definition: lq.h:90
M0_INTERNAL struct m0_ha_msg_qitem * m0_ha_msg_queue_dequeue(struct m0_ha_msg_queue *mq)
Definition: msg_queue.c:75
M0_INTERNAL uint64_t m0_ha_lq_dequeue(struct m0_ha_lq *lq)
Definition: lq.c:256
struct m0_ha_msg_queue hlq_mq
Definition: lq.h:99
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_ha_msg_queue_enqueue(struct m0_ha_msg_queue *mq, struct m0_ha_msg_qitem *qitem)
Definition: msg_queue.c:68
M0_INTERNAL void m0_ha_lq_tags_get(const struct m0_ha_lq *lq, struct m0_ha_link_tags *tags)
Definition: lq.c:80
M0_INTERNAL bool m0_ha_lq_has_next(const struct m0_ha_lq *lq)
Definition: lq.c:141
M0_INTERNAL struct m0_ha_msg_qitem * m0_ha_msg_queue_find(struct m0_ha_msg_queue *mq, uint64_t tag)
Definition: msg_queue.c:97
#define equi(a, b)
Definition: misc.h:297
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL bool m0_ha_lq_is_delivered(const struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:148
#define HLTAGS_P(_tags)
Definition: link_fops.h:77
Definition: lq.h:97
M0_INTERNAL void m0_ha_msg_queue_fini(struct m0_ha_msg_queue *mq)
Definition: msg_queue.c:48
M0_INTERNAL void m0_ha_lq_mark_delivered(struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:228
M0_INTERNAL void m0_ha_msg_queue_free(struct m0_ha_msg_queue *mq, struct m0_ha_msg_qitem *qitem)
Definition: msg_queue.c:62
#define M0_ASSERT(cond)
M0_INTERNAL uint64_t m0_ha_lq_tag_confirmed(const struct m0_ha_lq *lq)
Definition: lq.c:174
M0_INTERNAL struct m0_ha_msg * m0_ha_lq_msg_prev(struct m0_ha_lq *lq, const struct m0_ha_msg *cur)
Definition: lq.c:130
M0_INTERNAL bool m0_ha_lq_invariant(const struct m0_ha_lq *lq)
Definition: lq.c:53
uint64_t hm_tag
Definition: msg.h:125
M0_INTERNAL struct m0_ha_msg_qitem * m0_ha_msg_queue_alloc(struct m0_ha_msg_queue *mq)
Definition: msg_queue.c:54
Definition: msg.h:115
#define M0_POST(cond)
M0_INTERNAL uint64_t m0_ha_msg_tag(const struct m0_ha_msg *msg)
Definition: msg.c:36
M0_INTERNAL void m0_ha_lq_init(struct m0_ha_lq *lq, const struct m0_ha_lq_cfg *lq_cfg)
Definition: lq.c:58
M0_INTERNAL uint64_t m0_ha_lq_tag_assign(const struct m0_ha_lq *lq)
Definition: lq.c:156
#define HLTAGS_F
Definition: link_fops.h:75
struct m0_ha_link_tags hlq_tags
Definition: lq.h:100
M0_INTERNAL struct m0_ha_msg_qitem * m0_ha_msg_queue_next(struct m0_ha_msg_queue *mq, const struct m0_ha_msg_qitem *cur)
Definition: msg_queue.c:105
#define M0_IS0(obj)
Definition: misc.h:70
M0_INTERNAL struct m0_ha_msg_qitem * m0_ha_msg_queue_prev(struct m0_ha_msg_queue *mq, const struct m0_ha_msg_qitem *cur)
Definition: msg_queue.c:113
M0_INTERNAL struct m0_ha_msg * m0_ha_lq_msg_next(struct m0_ha_lq *lq, const struct m0_ha_msg *cur)
Definition: lq.c:119
M0_INTERNAL struct m0_ha_msg * m0_ha_lq_msg(struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:109
#define _0C(exp)
Definition: assert.h:311
enum m0_ha_msg_qitem_delivery_state hmq_delivery_state
Definition: msg_queue.h:51
#define M0_ASSERT_INFO(cond, fmt,...)
static unsigned done
Definition: storage.c:91
M0_INTERNAL uint64_t m0_ha_lq_tag_next(const struct m0_ha_lq *lq)
Definition: lq.c:162
M0_INTERNAL void m0_ha_msg_queue_init(struct m0_ha_msg_queue *mq, struct m0_ha_msg_queue_cfg *cfg)
Definition: msg_queue.c:42
#define M0_IMPOSSIBLE(fmt,...)
struct m0_ha_msg hmq_msg
Definition: msg_queue.h:50