Motr  M0
consumer.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_ADDB
31 
32 #include "lib/assert.h"
33 #include "lib/misc.h" /* M0_IS0 */
34 #include "motr/magic.h"
35 
36 #include "addb2/consumer.h"
37 #include "addb2/addb2.h"
38 #include "addb2/internal.h"
39 
40 M0_TL_DESCR_DEFINE(philter, "addb2 source philters",
41  static, struct m0_addb2_philter, ph_linkage, ph_magix,
43 M0_TL_DEFINE(philter, static, struct m0_addb2_philter);
44 
45 M0_TL_DESCR_DEFINE(callback, "addb2 philter callbacks",
46  static, struct m0_addb2_callback, ca_linkage, ca_magix,
48 M0_TL_DEFINE(callback, static, struct m0_addb2_callback);
49 
50 static bool true_matches(struct m0_addb2_philter *philter,
51  const struct m0_addb2_record *rec);
52 static bool id_matches(struct m0_addb2_philter *philter,
53  const struct m0_addb2_record *rec);
54 
56 {
57  philter_tlist_init(&src->so_philter);
58 }
59 
61 {
62  while (philter_tlist_pop(&src->so_philter) != NULL)
63  ;
64  philter_tlist_fini(&src->so_philter);
65 }
66 
68  bool (*matches)(struct m0_addb2_philter *,
69  const struct m0_addb2_record *),
70  void *datum)
71 {
72  philter->ph_matches = matches;
73  philter->ph_datum = datum;
74  philter_tlink_init(philter);
75  callback_tlist_init(&philter->ph_callback);
76 }
77 
79 {
80  while (callback_tlist_pop(&philter->ph_callback) != NULL)
81  ;
82  callback_tlist_fini(&philter->ph_callback);
83  philter_tlink_fini(philter);
84 }
85 
87  struct m0_addb2_philter *ph)
88 {
89  philter_tlink_init_at_tail(ph, &src->so_philter);
90 }
91 
93 {
94  philter_tlink_del_fini(ph);
95 }
96 
98  void (*fire)(const struct m0_addb2_source *,
99  const struct m0_addb2_philter *,
100  const struct m0_addb2_callback *,
101  const struct m0_addb2_record *),
102  void *datum)
103 {
104  callback->ca_fire = fire;
105  callback->ca_datum = datum;
106  callback_tlink_init(callback);
107 }
108 
110 {
111  callback_tlink_fini(callback);
112 }
113 
115  struct m0_addb2_callback *callback)
116 {
117  callback_tlink_init_at_tail(callback, &ph->ph_callback);
118 }
119 
121 {
122  callback_tlink_del_fini(callback);
123 }
124 
126 {
127  return &c->cu_src;
128 }
129 
130 static void philter_consume(struct m0_addb2_source *src,
131  struct m0_addb2_philter *ph,
132  const struct m0_addb2_record *rec)
133 {
134  struct m0_addb2_callback *callback;
135 
136  if (ph->ph_matches(ph, rec)) {
137  m0_tl_for(callback, &ph->ph_callback, callback) {
138  callback->ca_fire(src, ph, callback, rec);
139  } m0_tl_endfor;
140  }
141 }
142 
144  const struct m0_addb2_record *rec)
145 {
146  struct m0_addb2_philter *ph;
147  struct m0_addb2_module *am = m0_addb2_module_get();
148  int i;
149 
150  m0_tl_for(philter, &src->so_philter, ph) {
151  philter_consume(src, ph, rec);
152  } m0_tl_endfor;
153 
154  for (i = 0; i < ARRAY_SIZE(am->am_philter); ++i) {
155  if (am->am_philter[i] != NULL)
156  philter_consume(src, am->am_philter[i], rec);
157  }
158 }
159 
161 {
163 }
164 
165 void m0_addb2_philter_id_init(struct m0_addb2_philter *ph, uint64_t id)
166 {
167  m0_addb2_philter_init(ph, id_matches, (void *)id);
168 }
169 
170 static bool true_matches(struct m0_addb2_philter *philter,
171  const struct m0_addb2_record *rec)
172 {
173  return true;
174 }
175 
176 static bool id_matches(struct m0_addb2_philter *philter,
177  const struct m0_addb2_record *rec)
178 {
179  return rec->ar_val.va_id == (uint64_t)philter->ph_datum;
180 }
181 
183 {
184  struct m0_addb2_module *am = m0_addb2_module_get();
185  int i;
186 
187  for (i = 0; i < ARRAY_SIZE(am->am_philter); ++i) {
188  if (am->am_philter[i] == NULL) {
189  am->am_philter[i] = ph;
190  return;
191  }
192  }
193  M0_IMPOSSIBLE("Too many global philters.");
194 }
195 
197 {
198  struct m0_addb2_module *am = m0_addb2_module_get();
199  int i;
200 
201  for (i = 0; i < ARRAY_SIZE(am->am_philter); ++i) {
202  if (am->am_philter[i] == ph) {
203  am->am_philter[i] = NULL;
204  return;
205  }
206  }
207  M0_IMPOSSIBLE("Unknown global philter.");
208 }
209 
210 #undef M0_TRACE_SUBSYSTEM
211 
214 /*
215  * Local variables:
216  * c-indentation-style: "K&R"
217  * c-basic-offset: 8
218  * tab-width: 8
219  * fill-column: 80
220  * scroll-step: 1
221  * End:
222  */
223 /*
224  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
225  */
struct m0_tl ph_callback
Definition: consumer.h:147
void m0_addb2_callback_fini(struct m0_addb2_callback *callback)
Definition: consumer.c:109
#define NULL
Definition: misc.h:38
M0_INTERNAL struct m0_addb2_module * m0_addb2_module_get(void)
Definition: addb2.c:765
uint64_t va_id
Definition: consumer.h:103
void m0_addb2_philter_global_del(struct m0_addb2_philter *ph)
Definition: consumer.c:196
#define m0_tl_endfor
Definition: tlist.h:700
struct m0_addb2_philter * am_philter[M0_ADDB2_GLOBAL_PHILTERS]
Definition: internal.h:106
int i
Definition: dir.c:1033
M0_TL_DESCR_DEFINE(philter, "addb2 source philters", static, struct m0_addb2_philter, ph_linkage, ph_magix, M0_ADDB2_PHILTER_MAGIC, M0_ADDB2_PHILTER_HEAD_MAGIC)
void m0_addb2_callback_add(struct m0_addb2_philter *ph, struct m0_addb2_callback *callback)
Definition: consumer.c:114
void m0_addb2_philter_id_init(struct m0_addb2_philter *ph, uint64_t id)
Definition: consumer.c:165
void m0_addb2_philter_true_init(struct m0_addb2_philter *ph)
Definition: consumer.c:160
static struct m0_addb2_callback c
Definition: consumer.c:41
void m0_addb2_consume(struct m0_addb2_source *src, const struct m0_addb2_record *rec)
Definition: consumer.c:143
void m0_addb2_philter_global_add(struct m0_addb2_philter *ph)
Definition: consumer.c:182
void m0_addb2_callback_del(struct m0_addb2_callback *callback)
Definition: consumer.c:120
void m0_addb2_philter_fini(struct m0_addb2_philter *philter)
Definition: consumer.c:78
bool(* ph_matches)(struct m0_addb2_philter *ph, const struct m0_addb2_record *rec)
Definition: consumer.h:140
struct m0_addb2_value ar_val
Definition: consumer.h:114
void m0_addb2_philter_add(struct m0_addb2_source *src, struct m0_addb2_philter *ph)
Definition: consumer.c:86
void(* ca_fire)(const struct m0_addb2_source *src, const struct m0_addb2_philter *ph, const struct m0_addb2_callback *callback, const struct m0_addb2_record *rec)
Definition: consumer.h:174
M0_TL_DEFINE(philter, static, struct m0_addb2_philter)
void m0_addb2_callback_init(struct m0_addb2_callback *callback, void(*fire)(const struct m0_addb2_source *, const struct m0_addb2_philter *, const struct m0_addb2_callback *, const struct m0_addb2_record *), void *datum)
Definition: consumer.c:97
void m0_addb2_source_fini(struct m0_addb2_source *src)
Definition: consumer.c:60
void * ph_datum
Definition: consumer.h:143
void m0_addb2_philter_del(struct m0_addb2_philter *ph)
Definition: consumer.c:92
static bool true_matches(struct m0_addb2_philter *philter, const struct m0_addb2_record *rec)
Definition: consumer.c:170
static void philter_consume(struct m0_addb2_source *src, struct m0_addb2_philter *ph, const struct m0_addb2_record *rec)
Definition: consumer.c:130
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_addb2_source_init(struct m0_addb2_source *src)
Definition: consumer.c:55
void m0_addb2_philter_init(struct m0_addb2_philter *philter, bool(*matches)(struct m0_addb2_philter *, const struct m0_addb2_record *), void *datum)
Definition: consumer.c:67
struct m0_pdclust_src_addr src
Definition: fd.c:108
#define ARRAY_SIZE(a)
Definition: misc.h:45
static void(* fire)(const struct m0_addb2_source *, const struct m0_addb2_philter *, const struct m0_addb2_callback *, const struct m0_addb2_record *)
Definition: consumer.c:44
static bool id_matches(struct m0_addb2_philter *philter, const struct m0_addb2_record *rec)
Definition: consumer.c:176
struct m0_addb2_source * m0_addb2_cursor_source(struct m0_addb2_cursor *c)
Definition: consumer.c:125
#define M0_IMPOSSIBLE(fmt,...)